Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 97 additions & 31 deletions amplifier_module_loop_streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,25 +267,52 @@ async def _execute_stream(
"Appended ephemeral injection to last tool result message"
)
else:
# Fall back to new message if last message isn't a tool result
# Fall back: merge if same role, else new message
if last_msg.get("role") == result.context_injection_role:
original_content = last_msg.get("content", "")
message_dicts[-1] = {
**last_msg,
"content": f"{original_content}\n\n{result.context_injection}",
}
logger.debug(
"Merged ephemeral injection into last message (same role: %s)",
result.context_injection_role,
)
else:
message_dicts.append(
{
"role": result.context_injection_role,
"content": result.context_injection,
}
)
logger.debug(
f"Last message role is '{last_msg.get('role')}', not 'tool' - "
"created new message for injection"
)
else:
# Structural prevention: merge into last message if same role
# to avoid consecutive messages with the same role (confuses models)
if (
len(message_dicts) > 0
and message_dicts[-1].get("role") == result.context_injection_role
):
last_msg = message_dicts[-1]
original_content = last_msg.get("content", "")
message_dicts[-1] = {
**last_msg,
"content": f"{original_content}\n\n{result.context_injection}",
}
logger.debug(
"Merged ephemeral injection into last message (same role: %s)",
result.context_injection_role,
)
else:
message_dicts.append(
{
"role": result.context_injection_role,
"content": result.context_injection,
}
)
logger.debug(
f"Last message role is '{last_msg.get('role')}', not 'tool' - "
"created new message for injection"
)
else:
# Default behavior: append as new message
message_dicts.append(
{
"role": result.context_injection_role,
"content": result.context_injection,
}
)

# Apply pending ephemeral injections from tool:post hooks
if self._pending_ephemeral_injections:
Expand All @@ -305,22 +332,50 @@ async def _execute_stream(
"Applied pending ephemeral injection to last tool result"
)
else:
message_dicts.append(
{
"role": injection["role"],
"content": injection["content"],
# Merge if same role as last message
if last_msg.get("role") == injection["role"]:
original_content = last_msg.get("content", "")
message_dicts[-1] = {
**last_msg,
"content": f"{original_content}\n\n{injection['content']}",
}
logger.debug(
"Merged pending injection into last message (same role: %s)",
injection["role"],
)
else:
message_dicts.append(
{
"role": injection["role"],
"content": injection["content"],
}
)
logger.debug(
"Last message not a tool result, created new message for injection"
)
else:
# Structural prevention: merge if same role as last message
if (
len(message_dicts) > 0
and message_dicts[-1].get("role") == injection["role"]
):
last_msg = message_dicts[-1]
original_content = last_msg.get("content", "")
message_dicts[-1] = {
**last_msg,
"content": f"{original_content}\n\n{injection['content']}",
}
logger.debug(
"Merged pending ephemeral injection into last message (same role: %s)",
injection["role"],
)
else:
message_dicts.append(
{"role": injection["role"], "content": injection["content"]}
)
logger.debug(
"Last message not a tool result, created new message for injection"
"Applied pending ephemeral injection as new message"
)
else:
message_dicts.append(
{"role": injection["role"], "content": injection["content"]}
)
logger.debug(
"Applied pending ephemeral injection as new message"
)
# Clear pending injections after applying
self._pending_ephemeral_injections = []

Expand Down Expand Up @@ -669,16 +724,27 @@ async def _execute_stream(
# Get one final response with the reminder (via _execute_stream helper)
message_dicts = await context.get_messages_for_request(provider=provider)
message_dicts = list(message_dicts)
message_dicts.append(
{
"role": "user",
"content": """<system-reminder source="orchestrator-loop-limit">
# Merge loop-limit reminder into last message if same role,
# to avoid consecutive user messages that confuse role attribution
_loop_limit_content = """<system-reminder source="orchestrator-loop-limit">
You have reached the maximum number of iterations for this turn. Please provide a response to the user now, summarizing your progress and noting what remains to be done. You can continue in the next turn if needed.

DO NOT mention this iteration limit or reminder to the user explicitly. Simply wrap up naturally.
</system-reminder>""",
</system-reminder>"""
if (
len(message_dicts) > 0
and message_dicts[-1].get("role") == "user"
):
last_msg = message_dicts[-1]
original_content = last_msg.get("content", "")
message_dicts[-1] = {
**last_msg,
"content": f"{original_content}\n\n{_loop_limit_content}",
}
)
else:
message_dicts.append(
{"role": "user", "content": _loop_limit_content}
)

try:
# Convert dicts to ChatRequest
Expand Down