Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 303 additions & 16 deletions OpenWebUi Function v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import json
import random
import asyncio
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime
import logging
import traceback
Expand Down Expand Up @@ -56,12 +56,20 @@ class Valves(BaseModel):
)
ENABLE_PROMPT_ENRICHMENT: bool = Field(
default=False,
description="Use GPT-4.1 to enrich prompts before deep research",
description="Use gpt-5-chat-latest to enrich prompts before deep research",
)
ENABLE_CLARIFICATION: bool = Field(
default=False,
description="Ask clarifying questions before research (requires user interaction)",
)
DEBUG_CLARIFICATION: bool = Field(
default=False,
description="When true, include clarification debug details in chat output",
)
CLARIFICATION_QUESTION_COUNT: int = Field(
default=3,
description="Number of clarifying questions to ask before research (min 3, max 5)",
)
MAX_TOOL_CALLS: Optional[int] = Field(
default=None,
description="Maximum number of tool calls to limit cost/latency",
Expand Down Expand Up @@ -96,7 +104,7 @@ def __init__(self):
self.valves = self.Valves()
# Use exact model names as they appear in the API
self.deep_research_models = ["o3-deep-research", "o4-mini-deep-research"]
self.enrichment_model = "gpt-4.1"
self.enrichment_model = "gpt-5-chat-latest"

def pipes(self):
res = []
Expand All @@ -106,7 +114,7 @@ def pipes(self):
return res

async def enrich_prompt(self, original_prompt: str, headers: dict) -> str:
"""Enrich the user prompt using GPT-4.1 for better research results"""
"""Enrich the user prompt using gpt-5-chat-latest for better research results"""

instructions = """
You will be given a research task by a user. Your job is to produce a set of
Expand Down Expand Up @@ -178,6 +186,104 @@ async def enrich_prompt(self, original_prompt: str, headers: dict) -> str:
logger.error(f"Error enriching prompt: {e}")
return original_prompt

async def generate_clarification_questions(
self,
user_prompt: str,
headers: dict,
desired_count: int,
debug: bool = False,
) -> Tuple[List[str], Optional[str]]:
"""
Ask the enrichment model to produce a short list of clarifying questions.
Falls back to static questions on failure.
"""

question_count = max(3, min(desired_count, 5))
debug_info = None

instructions = f"""
You are assisting with a research task. Generate {question_count} concise clarifying questions
that help specify the user's request. The questions should:
- Be directly relevant to the user's goal
- Avoid yes/no when specifics are better
- Fit in one sentence

Return ONLY the questions as a bullet list starting with "- ".
"""

payload = {
"model": self.enrichment_model,
"input": f"User request:\n{user_prompt}",
"instructions": instructions,
}

try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{self.valves.BASE_URL}/responses",
json=payload,
headers=headers,
)

if response.status_code != 200:
if debug:
debug_info = f"HTTP {response.status_code}: {response.text}"
raise Exception(f"HTTP {response.status_code}")

result = response.json()
if debug:
try:
debug_info = json.dumps(result, indent=2)[:2000]
except Exception:
debug_info = str(result)

output_text = result.get("output_text")

if not output_text:
# Try new-style output array if present
output = result.get("output")
if output and isinstance(output, list):
text_items = []
for item in output:
if isinstance(item, dict) and item.get("type") == "message":
for content_item in item.get("content", []):
if (
isinstance(content_item, dict)
and content_item.get("type") == "output_text"
):
text_items.append(content_item.get("text", ""))
output_text = "\n".join(t for t in text_items if t)

if not output_text:
raise Exception("Missing output_text")

# Parse bullet list
questions = []
for line in output_text.splitlines():
stripped = line.strip()
if stripped.startswith("-"):
question = stripped.lstrip("-").strip()
if question:
questions.append(question)

if not questions:
raise Exception("No questions parsed")

return questions[:question_count], debug_info

except Exception as e:
logger.warning(f"Falling back to default clarification questions: {e}")
fallback = [
"What is the primary goal or output you want from this research?",
"Are there specific constraints (timeframe, budget, geography, industry)?",
"Which sources or data types should be prioritized or avoided?",
"How deep or technical should the research go?",
"Do you need comparisons, recommendations, or just a summary?",
]
if debug and not debug_info:
debug_info = f"fallback_reason={e}"
return fallback[:question_count], debug_info

async def create_background_response(self, payload: dict, headers: dict) -> dict:
"""Create a background response and return the response object"""

Expand Down Expand Up @@ -551,20 +657,60 @@ async def pipe(self, body: dict, __user__: dict):
m["content"] for m in messages if m.get("role") == "system"
).strip()

# (b) The latest user message (unchanged logic, but fall back gracefully)
user_input = next(
(
m["content"]
if isinstance(m["content"], str)
else "".join(p.get("text", "") for p in m["content"])
for m in reversed(messages)
if m.get("role") == "user"
),
None,
)
def _flatten_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
return "".join(
p.get("text", "") if isinstance(p, dict) else str(p)
for p in content
)
return str(content) if content is not None else ""

# Collect all user messages in order
user_messages = [
(idx, _flatten_content(m.get("content")))
for idx, m in enumerate(messages)
if m.get("role") == "user"
]

# Use the latest user message as the working input, but keep the first
# user message as the original question so we can recombine later.
user_input = user_messages[-1][1] if user_messages else None
if not user_input:
yield "Error: No user input found"
return
original_user_input = user_messages[0][1] if user_messages else user_input

# Try to detect if we already asked clarifications in a previous turn and the user just replied.
# This avoids re-asking the same questions when the UI doesn't populate clarification_answers.
asked_for_clarification = False
clarification_prompt_index = None
clarification_prompt_text = None
last_user_index = None
for idx, m in enumerate(messages):
if m.get("role") == "user":
last_user_index = idx
if last_user_index is not None:
for m in reversed(messages[: last_user_index]):
if m.get("role") != "assistant":
continue
content = m.get("content", "")
if isinstance(content, list):
content = "".join(
p.get("text", "") if isinstance(p, dict) else str(p)
for p in content
)
content_lower = content.lower()
if (
"please answer the following" in content_lower
or "reply with your answers" in content_lower
or "clarifications" in content_lower
):
asked_for_clarification = True
clarification_prompt_index = messages.index(m)
clarification_prompt_text = content
break

# Test mode - just echo back to verify pipe is working
if self.valves.TEST_MODE:
Expand All @@ -591,6 +737,139 @@ async def pipe(self, body: dict, __user__: dict):
}
return

clarification_enabled = self.valves.ENABLE_CLARIFICATION
clarification_answers = body.get("clarification_answers")

# If the UI didn't supply clarification_answers but we detect a prior clarification prompt
# and a new user reply, treat the latest user message as the answers.
if (
clarification_enabled
and not clarification_answers
and asked_for_clarification
and isinstance(user_input, str)
):
candidate_lines = [ln.strip() for ln in user_input.splitlines() if ln.strip()]
if candidate_lines:
# Strip common list prefixes
cleaned = []
for ln in candidate_lines:
cleaned.append(ln.lstrip("-•").lstrip("1234567890. ").strip())
clarification_answers = cleaned

# Ask clarifying questions before any enrichment/research
if clarification_enabled and not clarification_answers:
desired_count = max(3, min(self.valves.CLARIFICATION_QUESTION_COUNT, 5))
questions, debug_info = await self.generate_clarification_questions(
user_input,
headers,
desired_count,
debug=self.valves.DEBUG_CLARIFICATION,
)

# Stream the questions to the user and stop so they can reply
yield {
"choices": [{
"delta": {
"role": "assistant",
"content": "❓ Before I start deep research, I need a few clarifications.\n"
}
}]
}
yield {
"choices": [{
"delta": {
"content": f"Please answer the following {len(questions)} questions:\n"
}
}]
}

for idx, q in enumerate(questions, start=1):
yield {
"choices": [{
"delta": {
"content": f"{idx}. {q}\n"
}
}]
}

if self.valves.DEBUG_CLARIFICATION and debug_info:
yield {
"choices": [{
"delta": {
"content": "\n[debug] clarification generation response:\n"
}
}]
}
yield {
"choices": [{
"delta": {
"content": f"{debug_info}\n"
}
}]
}

yield {
"choices": [{
"delta": {
"content": "\nReply with your answers so I can begin the research.\n"
}
}]
}
yield {
"choices": [{
"delta": {},
"finish_reason": "stop"
}]
}
return

# If answers are provided, fold them into the user input
if clarification_enabled and clarification_answers:
answers_lines = []
if isinstance(clarification_answers, dict):
for k, v in clarification_answers.items():
answers_lines.append(f"- {k}: {v}")
elif isinstance(clarification_answers, list):
for idx, ans in enumerate(clarification_answers, start=1):
answers_lines.append(f"{idx}. {ans}")
else:
answers_lines.append(str(clarification_answers))

clar_block = "\n".join(answers_lines)

# Extract clarification questions from the prior assistant prompt if available
clar_questions = []
if clarification_prompt_text:
for ln in clarification_prompt_text.splitlines():
stripped = ln.strip()
if stripped[:1].isdigit() or stripped.startswith("-"):
clar_questions.append(stripped.lstrip("-").strip())

questions_section = ""
if clar_questions:
questions_section = "Clarification questions:\n" + "\n".join(
f"{idx+1}. {q}" for idx, q in enumerate(clar_questions)
)

base_input = original_user_input or user_input
prompt_parts = [
"Original question:",
base_input,
]
if questions_section:
prompt_parts.append(questions_section)
prompt_parts.append("Clarification answers:")
prompt_parts.append(clar_block)
user_input = "\n".join(prompt_parts)
yield {
"choices": [{
"delta": {
"role": "assistant",
"content": "✅ Clarifications received. Proceeding with deep research.\n\n"
}
}]
}

# Enrich the prompt if enabled
if self.valves.ENABLE_PROMPT_ENRICHMENT:
try:
Expand Down Expand Up @@ -657,10 +936,18 @@ async def pipe(self, body: dict, __user__: dict):
}]
}

preview_lines = []
preview_lines.append(
f"📝 Input preview: {enriched_input[:100]}{'...' if len(enriched_input) > 100 else ''}"
)
if clarification_enabled and clarification_answers:
preview_lines.append("↪ Includes: original question + clarifications")
preview = "\n".join(preview_lines) + "\n"

yield {
"choices": [{
"delta": {
"content": f"📝 Input: {enriched_input[:100]}{'...' if len(enriched_input) > 100 else ''}\n"
"content": preview
}
}]
}
Expand Down