diff --git a/examples/memory/compaction_session_example.py b/examples/memory/compaction_session_example.py new file mode 100644 index 000000000..c84822e30 --- /dev/null +++ b/examples/memory/compaction_session_example.py @@ -0,0 +1,64 @@ +""" +Example demonstrating OpenAI responses.compact session functionality. + +This example shows how to use OpenAIResponsesCompactionSession to automatically +compact conversation history when it grows too large, reducing token usage +while preserving context. +""" + +import asyncio + +from agents import Agent, OpenAIResponsesCompactionSession, Runner, SQLiteSession + + +async def main(): + # Create an underlying session for storage + underlying = SQLiteSession(":memory:") + + # Wrap with compaction session - will automatically compact when threshold hit + session = OpenAIResponsesCompactionSession( + session_id="demo-session", + underlying_session=underlying, + model="gpt-4.1", + # Custom compaction trigger (default is 10 candidates) + should_trigger_compaction=lambda ctx: len(ctx["compaction_candidate_items"]) >= 4, + ) + + agent = Agent( + name="Assistant", + instructions="Reply concisely. Keep answers to 1-2 sentences.", + ) + + print("=== Compaction Session Example ===\n") + + prompts = [ + "What is the tallest mountain in the world?", + "How tall is it in feet?", + "When was it first climbed?", + "Who was on that expedition?", + "What country is the mountain in?", + ] + + for i, prompt in enumerate(prompts, 1): + print(f"Turn {i}:") + print(f"User: {prompt}") + result = await Runner.run(agent, prompt, session=session) + print(f"Assistant: {result.final_output}\n") + + # Show final session state + items = await session.get_items() + print("=== Final Session State ===") + print(f"Total items: {len(items)}") + for item in items: + item_type = item.get("type", "unknown") + if item_type == "compaction": + print(" - compaction (encrypted content)") + elif item_type == "message": + role = item.get("role", "unknown") + print(f" - message ({role})") + else: + print(f" - {item_type}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agents/__init__.py b/src/agents/__init__.py index 51cd09e66..572e73099 100644 --- a/src/agents/__init__.py +++ b/src/agents/__init__.py @@ -49,6 +49,7 @@ set_conversation_history_wrappers, ) from .items import ( + CompactionItem, HandoffCallItem, HandoffOutputItem, ItemHelpers, @@ -63,9 +64,13 @@ from .lifecycle import AgentHooks, RunHooks from .memory import ( OpenAIConversationsSession, + OpenAIResponsesCompactionArgs, + OpenAIResponsesCompactionAwareSession, + OpenAIResponsesCompactionSession, Session, SessionABC, SQLiteSession, + is_openai_responses_compaction_aware_session, ) from .model_settings import ModelSettings from .models.interface import Model, ModelProvider, ModelTracing @@ -291,6 +296,11 @@ def enable_verbose_stdout_logging(): "SessionABC", "SQLiteSession", "OpenAIConversationsSession", + "OpenAIResponsesCompactionSession", + "OpenAIResponsesCompactionArgs", + "OpenAIResponsesCompactionAwareSession", + "is_openai_responses_compaction_aware_session", + "CompactionItem", "AgentHookContext", "RunContextWrapper", "TContext", diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index 54fceef57..f90146e0e 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -57,6 +57,7 @@ from .guardrail import InputGuardrail, InputGuardrailResult, OutputGuardrail, OutputGuardrailResult from .handoffs import Handoff, HandoffInputData, nest_handoff_history from .items import ( + CompactionItem, HandoffCallItem, HandoffOutputItem, ItemHelpers, @@ -539,6 +540,9 @@ def process_model_response( logger.debug("Queuing shell_call %s", call_identifier) shell_calls.append(ToolRunShellCall(tool_call=output, shell_tool=shell_tool)) continue + if output_type == "compaction": + items.append(CompactionItem(raw_item=cast(dict[str, Any], output), agent=agent)) + continue if output_type == "apply_patch_call": items.append(ToolCallItem(raw_item=cast(Any, output), agent=agent)) if apply_patch_tool: diff --git a/src/agents/items.py b/src/agents/items.py index 991a7f877..4845344e3 100644 --- a/src/agents/items.py +++ b/src/agents/items.py @@ -327,6 +327,23 @@ class MCPApprovalResponseItem(RunItemBase[McpApprovalResponse]): type: Literal["mcp_approval_response_item"] = "mcp_approval_response_item" +@dataclass +class CompactionItem: + """Represents a compaction item from responses.compact.""" + + agent: Agent[Any] + """The agent whose run caused this item to be generated.""" + + raw_item: dict[str, Any] + """The raw compaction item containing encrypted_content.""" + + type: Literal["compaction_item"] = "compaction_item" + + def to_input_item(self) -> TResponseInputItem: + """Converts this item into an input item suitable for passing to the model.""" + return cast(TResponseInputItem, self.raw_item) + + RunItem: TypeAlias = Union[ MessageOutputItem, HandoffCallItem, @@ -337,6 +354,7 @@ class MCPApprovalResponseItem(RunItemBase[McpApprovalResponse]): MCPListToolsItem, MCPApprovalRequestItem, MCPApprovalResponseItem, + CompactionItem, ] """An item generated by an agent.""" diff --git a/src/agents/memory/__init__.py b/src/agents/memory/__init__.py index 1db1598ac..e1a06156e 100644 --- a/src/agents/memory/__init__.py +++ b/src/agents/memory/__init__.py @@ -1,5 +1,12 @@ from .openai_conversations_session import OpenAIConversationsSession -from .session import Session, SessionABC +from .openai_responses_compaction_session import OpenAIResponsesCompactionSession +from .session import ( + OpenAIResponsesCompactionArgs, + OpenAIResponsesCompactionAwareSession, + Session, + SessionABC, + is_openai_responses_compaction_aware_session, +) from .sqlite_session import SQLiteSession from .util import SessionInputCallback @@ -9,4 +16,8 @@ "SessionInputCallback", "SQLiteSession", "OpenAIConversationsSession", + "OpenAIResponsesCompactionSession", + "OpenAIResponsesCompactionArgs", + "OpenAIResponsesCompactionAwareSession", + "is_openai_responses_compaction_aware_session", ] diff --git a/src/agents/memory/openai_responses_compaction_session.py b/src/agents/memory/openai_responses_compaction_session.py new file mode 100644 index 000000000..e23c59093 --- /dev/null +++ b/src/agents/memory/openai_responses_compaction_session.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Callable + +from openai import AsyncOpenAI + +from ..models._openai_shared import get_default_openai_client +from .openai_conversations_session import OpenAIConversationsSession +from .session import ( + OpenAIResponsesCompactionArgs, + OpenAIResponsesCompactionAwareSession, + SessionABC, +) + +if TYPE_CHECKING: + from ..items import TResponseInputItem + from .session import Session + +logger = logging.getLogger("openai-agents.openai.compaction") + +DEFAULT_COMPACTION_THRESHOLD = 10 + + +def select_compaction_candidate_items( + items: list[TResponseInputItem], +) -> list[TResponseInputItem]: + """Select compaction candidate items. + + Excludes user messages and compaction items. + """ + return [ + item + for item in items + if not ( + (item.get("type") == "message" and item.get("role") == "user") + or item.get("type") == "compaction" + ) + ] + + +def default_should_trigger_compaction(context: dict[str, Any]) -> bool: + """Default decision: compact when >= 10 candidate items exist.""" + return len(context["compaction_candidate_items"]) >= DEFAULT_COMPACTION_THRESHOLD + + +def is_openai_model_name(model: str) -> bool: + """Validate model name follows OpenAI conventions.""" + trimmed = model.strip() + if not trimmed: + return False + + # Handle fine-tuned models: ft:gpt-4.1:org:proj:suffix + without_ft_prefix = trimmed[3:] if trimmed.startswith("ft:") else trimmed + root = without_ft_prefix.split(":", 1)[0] + + # Allow gpt-* and o* models + if root.startswith("gpt-"): + return True + if root.startswith("o") and root[1:2].isdigit(): + return True + + return False + + +class OpenAIResponsesCompactionSession(SessionABC, OpenAIResponsesCompactionAwareSession): + """Session decorator that triggers responses.compact when stored history grows. + + Works with OpenAI Responses API models only. Wraps any Session (except + OpenAIConversationsSession) and automatically calls the OpenAI responses.compact + API after each turn when the decision hook returns True. + """ + + def __init__( + self, + session_id: str, + underlying_session: Session, + *, + client: AsyncOpenAI | None = None, + model: str = "gpt-4.1", + should_trigger_compaction: Callable[[dict[str, Any]], bool] | None = None, + ): + """Initialize the compaction session. + + Args: + session_id: Identifier for this session. + underlying_session: Session store that holds the compacted history. Cannot be + OpenAIConversationsSession. + client: OpenAI client for responses.compact API calls. Defaults to + get_default_openai_client() or new AsyncOpenAI(). + model: Model to use for responses.compact. Defaults to "gpt-4.1". Must be an + OpenAI model name (gpt-*, o*, or ft:gpt-*). + should_trigger_compaction: Custom decision hook. Defaults to triggering when + 10+ compaction candidates exist. + """ + if isinstance(underlying_session, OpenAIConversationsSession): + raise ValueError( + "OpenAIResponsesCompactionSession cannot wrap OpenAIConversationsSession " + "because it manages its own history on the server." + ) + + if not is_openai_model_name(model): + raise ValueError(f"Unsupported model for OpenAI responses compaction: {model}") + + self.session_id = session_id + self.underlying_session = underlying_session + self._client = client + self.model = model + self.should_trigger_compaction = ( + should_trigger_compaction or default_should_trigger_compaction + ) + + # Cache for incremental candidate tracking + self._compaction_candidate_items: list[TResponseInputItem] | None = None + self._session_items: list[TResponseInputItem] | None = None + self._response_id: str | None = None + + @property + def client(self) -> AsyncOpenAI: + if self._client is None: + self._client = get_default_openai_client() or AsyncOpenAI() + return self._client + + async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None) -> None: + """Run compaction using responses.compact API.""" + if args and args.get("response_id"): + self._response_id = args["response_id"] + + if not self._response_id: + raise ValueError( + "OpenAIResponsesCompactionSession.run_compaction requires a response_id" + ) + + compaction_candidate_items, session_items = await self._ensure_compaction_candidates() + + force = args.get("force", False) if args else False + should_compact = force or self.should_trigger_compaction( + { + "response_id": self._response_id, + "compaction_candidate_items": compaction_candidate_items, + "session_items": session_items, + } + ) + + if not should_compact: + logger.debug(f"skip: decision hook declined compaction for {self._response_id}") + return + + logger.debug(f"compact: start for {self._response_id} using {self.model}") + + compacted = await self.client.responses.compact( + previous_response_id=self._response_id, + model=self.model, + ) + + await self.underlying_session.clear_session() + output_items: list[TResponseInputItem] = [] + if compacted.output: + for item in compacted.output: + if isinstance(item, dict): + output_items.append(item) + else: + output_items.append(item.model_dump(exclude_unset=True)) # type: ignore + + if output_items: + await self.underlying_session.add_items(output_items) + + self._compaction_candidate_items = select_compaction_candidate_items(output_items) + self._session_items = output_items + + logger.debug( + f"compact: done for {self._response_id} " + f"(output={len(output_items)}, candidates={len(self._compaction_candidate_items)})" + ) + + async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: + return await self.underlying_session.get_items(limit) + + async def add_items(self, items: list[TResponseInputItem]) -> None: + await self.underlying_session.add_items(items) + if self._compaction_candidate_items is not None: + new_candidates = select_compaction_candidate_items(items) + if new_candidates: + self._compaction_candidate_items.extend(new_candidates) + if self._session_items is not None: + self._session_items.extend(items) + + async def pop_item(self) -> TResponseInputItem | None: + popped = await self.underlying_session.pop_item() + if popped: + self._compaction_candidate_items = None + self._session_items = None + return popped + + async def clear_session(self) -> None: + await self.underlying_session.clear_session() + self._compaction_candidate_items = [] + self._session_items = [] + + async def _ensure_compaction_candidates( + self, + ) -> tuple[list[TResponseInputItem], list[TResponseInputItem]]: + """Lazy-load and cache compaction candidates.""" + if self._compaction_candidate_items is not None and self._session_items is not None: + return (self._compaction_candidate_items[:], self._session_items[:]) + + history = await self.underlying_session.get_items() + candidates = select_compaction_candidate_items(history) + self._compaction_candidate_items = candidates + self._session_items = history + + logger.debug( + f"candidates: initialized (history={len(history)}, candidates={len(candidates)})" + ) + return (candidates[:], history[:]) diff --git a/src/agents/memory/session.py b/src/agents/memory/session.py index 9c85af6dd..bb92c8654 100644 --- a/src/agents/memory/session.py +++ b/src/agents/memory/session.py @@ -3,6 +3,8 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Protocol, runtime_checkable +from typing_extensions import TypedDict, TypeGuard + if TYPE_CHECKING: from ..items import TResponseInputItem @@ -97,3 +99,29 @@ async def pop_item(self) -> TResponseInputItem | None: async def clear_session(self) -> None: """Clear all items for this session.""" ... + + +class OpenAIResponsesCompactionArgs(TypedDict, total=False): + """Arguments for the run_compaction method.""" + + response_id: str + """The ID of the last response to use for compaction.""" + + force: bool + """Whether to force compaction even if the threshold is not met.""" + + +@runtime_checkable +class OpenAIResponsesCompactionAwareSession(Session, Protocol): + """Protocol for session implementations that support responses compaction.""" + + async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None) -> None: + """Run the compaction process for the session.""" + ... + + +def is_openai_responses_compaction_aware_session( + session: Session | None, +) -> TypeGuard[OpenAIResponsesCompactionAwareSession]: + """Check if a session supports responses compaction.""" + return isinstance(session, OpenAIResponsesCompactionAwareSession) diff --git a/src/agents/models/chatcmpl_converter.py b/src/agents/models/chatcmpl_converter.py index abdfa3047..1f21e710c 100644 --- a/src/agents/models/chatcmpl_converter.py +++ b/src/agents/models/chatcmpl_converter.py @@ -582,7 +582,14 @@ def ensure_assistant_message() -> ChatCompletionAssistantMessageParam: # This preserves the original behavior pending_thinking_blocks = reconstructed_thinking_blocks - # 8) If we haven't recognized it => fail or ignore + # 8) compaction items => reject for chat completions + elif isinstance(item, dict) and item.get("type") == "compaction": + raise UserError( + "Compaction items are not supported for chat completions. " + "Please use the Responses API to handle compaction." + ) + + # 9) If we haven't recognized it => fail or ignore else: raise UserError(f"Unhandled item type or structure: {item}") diff --git a/src/agents/run.py b/src/agents/run.py index 5b5e6fdfa..a3bf00c93 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -59,7 +59,7 @@ ) from .lifecycle import AgentHooksBase, RunHooks, RunHooksBase from .logger import logger -from .memory import Session, SessionInputCallback +from .memory import Session, SessionInputCallback, is_openai_responses_compaction_aware_session from .model_settings import ModelSettings from .models.interface import Model, ModelProvider from .models.multi_provider import MultiProvider @@ -736,7 +736,10 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) return result @@ -748,7 +751,10 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) current_agent = cast(Agent[TContext], turn_result.next_step.new_agent) current_span.finish(reset_current=True) @@ -760,7 +766,10 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) else: raise AgentsException( @@ -1229,7 +1238,10 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) current_agent = turn_result.next_step.new_agent @@ -1275,7 +1287,10 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) @@ -1288,7 +1303,10 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.new_step_items, + turn_result.model_response.response_id, ) # Check for soft cancel after turn completion @@ -2006,6 +2024,7 @@ async def _save_result_to_session( session: Session | None, original_input: str | list[TResponseInputItem], new_items: list[RunItem], + response_id: str | None = None, ) -> None: """ Save the conversation turn to session. @@ -2025,6 +2044,10 @@ async def _save_result_to_session( items_to_save = input_list + new_items_as_input await session.add_items(items_to_save) + # Run compaction if session supports it and we have a response_id + if response_id and is_openai_responses_compaction_aware_session(session): + await session.run_compaction({"response_id": response_id}) + @staticmethod async def _input_guardrail_tripwire_triggered_for_stream( streamed_result: RunResultStreaming, diff --git a/tests/memory/test_openai_responses_compaction_session.py b/tests/memory/test_openai_responses_compaction_session.py new file mode 100644 index 000000000..204dbcb11 --- /dev/null +++ b/tests/memory/test_openai_responses_compaction_session.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +from typing import cast +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agents.items import TResponseInputItem +from agents.memory import ( + OpenAIResponsesCompactionSession, + Session, + is_openai_responses_compaction_aware_session, +) +from agents.memory.openai_responses_compaction_session import ( + DEFAULT_COMPACTION_THRESHOLD, + is_openai_model_name, + select_compaction_candidate_items, +) + + +class TestIsOpenAIModelName: + def test_gpt_models(self) -> None: + assert is_openai_model_name("gpt-4o") is True + assert is_openai_model_name("gpt-4o-mini") is True + assert is_openai_model_name("gpt-3.5-turbo") is True + assert is_openai_model_name("gpt-4.1") is True + assert is_openai_model_name("gpt-5") is True + assert is_openai_model_name("gpt-5.2") is True + assert is_openai_model_name("gpt-5-mini") is True + assert is_openai_model_name("gpt-5-nano") is True + + def test_o_models(self) -> None: + assert is_openai_model_name("o1") is True + assert is_openai_model_name("o1-preview") is True + assert is_openai_model_name("o3") is True + + def test_fine_tuned_models(self) -> None: + assert is_openai_model_name("ft:gpt-4o-mini:org:proj:suffix") is True + assert is_openai_model_name("ft:gpt-4.1:my-org::id") is True + + def test_invalid_models(self) -> None: + assert is_openai_model_name("") is False + assert is_openai_model_name("not-openai") is False + + +class TestSelectCompactionCandidateItems: + def test_excludes_user_messages(self) -> None: + items: list[TResponseInputItem] = [ + cast(TResponseInputItem, {"type": "message", "role": "user", "content": "hello"}), + cast(TResponseInputItem, {"type": "message", "role": "assistant", "content": "hi"}), + ] + result = select_compaction_candidate_items(items) + assert len(result) == 1 + assert result[0].get("role") == "assistant" + + def test_excludes_compaction_items(self) -> None: + items: list[TResponseInputItem] = [ + cast(TResponseInputItem, {"type": "compaction", "summary": "..."}), + cast(TResponseInputItem, {"type": "message", "role": "assistant", "content": "hi"}), + ] + result = select_compaction_candidate_items(items) + assert len(result) == 1 + assert result[0].get("type") == "message" + + +class TestOpenAIResponsesCompactionSession: + def create_mock_session(self) -> MagicMock: + mock = MagicMock(spec=Session) + mock.session_id = "test-session" + mock.get_items = AsyncMock(return_value=[]) + mock.add_items = AsyncMock() + mock.pop_item = AsyncMock(return_value=None) + mock.clear_session = AsyncMock() + return mock + + def test_init_validates_model(self) -> None: + mock_session = self.create_mock_session() + + with pytest.raises(ValueError, match="Unsupported model"): + OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + model="claude-3", + ) + + def test_init_accepts_valid_model(self) -> None: + mock_session = self.create_mock_session() + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + model="gpt-4.1", + ) + assert session.model == "gpt-4.1" + + @pytest.mark.asyncio + async def test_add_items_delegates(self) -> None: + mock_session = self.create_mock_session() + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + ) + + items: list[TResponseInputItem] = [ + cast(TResponseInputItem, {"type": "message", "role": "assistant", "content": "test"}) + ] + await session.add_items(items) + + mock_session.add_items.assert_called_once_with(items) + + @pytest.mark.asyncio + async def test_get_items_delegates(self) -> None: + mock_session = self.create_mock_session() + mock_session.get_items.return_value = [{"type": "message", "content": "test"}] + + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + ) + + result = await session.get_items() + assert len(result) == 1 + mock_session.get_items.assert_called_once() + + @pytest.mark.asyncio + async def test_run_compaction_requires_response_id(self) -> None: + mock_session = self.create_mock_session() + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + ) + + with pytest.raises(ValueError, match="requires a response_id"): + await session.run_compaction() + + @pytest.mark.asyncio + async def test_run_compaction_skips_when_below_threshold(self) -> None: + mock_session = self.create_mock_session() + # Return fewer than threshold items + mock_session.get_items.return_value = [ + cast(TResponseInputItem, {"type": "message", "role": "assistant", "content": f"msg{i}"}) + for i in range(DEFAULT_COMPACTION_THRESHOLD - 1) + ] + + mock_client = MagicMock() + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + client=mock_client, + ) + + await session.run_compaction({"response_id": "resp-123"}) + + # Should not have called the compact API + mock_client.responses.compact.assert_not_called() + + @pytest.mark.asyncio + async def test_run_compaction_executes_when_threshold_met(self) -> None: + mock_session = self.create_mock_session() + # Return exactly threshold items (all assistant messages = candidates) + mock_session.get_items.return_value = [ + cast(TResponseInputItem, {"type": "message", "role": "assistant", "content": f"msg{i}"}) + for i in range(DEFAULT_COMPACTION_THRESHOLD) + ] + + mock_compact_response = MagicMock() + mock_compact_response.output = [{"type": "compaction", "summary": "compacted"}] + + mock_client = MagicMock() + mock_client.responses.compact = AsyncMock(return_value=mock_compact_response) + + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + client=mock_client, + model="gpt-4.1", + ) + + await session.run_compaction({"response_id": "resp-123"}) + + mock_client.responses.compact.assert_called_once_with( + previous_response_id="resp-123", + model="gpt-4.1", + ) + mock_session.clear_session.assert_called_once() + mock_session.add_items.assert_called() + + @pytest.mark.asyncio + async def test_run_compaction_force_bypasses_threshold(self) -> None: + mock_session = self.create_mock_session() + mock_session.get_items.return_value = [] + + mock_compact_response = MagicMock() + mock_compact_response.output = [] + + mock_client = MagicMock() + mock_client.responses.compact = AsyncMock(return_value=mock_compact_response) + + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_session, + client=mock_client, + ) + + await session.run_compaction({"response_id": "resp-123", "force": True}) + + mock_client.responses.compact.assert_called_once() + + +class TestTypeGuard: + def test_is_compaction_aware_session_true(self) -> None: + mock_underlying = MagicMock(spec=Session) + mock_underlying.session_id = "test" + mock_underlying.get_items = AsyncMock(return_value=[]) + mock_underlying.add_items = AsyncMock() + mock_underlying.pop_item = AsyncMock(return_value=None) + mock_underlying.clear_session = AsyncMock() + + session = OpenAIResponsesCompactionSession( + session_id="test", + underlying_session=mock_underlying, + ) + assert is_openai_responses_compaction_aware_session(session) is True + + def test_is_compaction_aware_session_false(self) -> None: + mock_session = MagicMock(spec=Session) + assert is_openai_responses_compaction_aware_session(mock_session) is False + + def test_is_compaction_aware_session_none(self) -> None: + assert is_openai_responses_compaction_aware_session(None) is False