forked from Alibaba-NLP/DeepResearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtool_visit.py
More file actions
256 lines (222 loc) · 10.9 KB
/
tool_visit.py
File metadata and controls
256 lines (222 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import json
import os
import signal
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Union
import requests
from qwen_agent.tools.base import BaseTool, register_tool
from prompt import EXTRACTOR_PROMPT
from openai import OpenAI
import random
from urllib.parse import urlparse, unquote
import time
from transformers import AutoTokenizer
import tiktoken
VISIT_SERVER_TIMEOUT = int(os.getenv("VISIT_SERVER_TIMEOUT", 200))
WEBCONTENT_MAXLENGTH = int(os.getenv("WEBCONTENT_MAXLENGTH", 150000))
JINA_API_KEYS = os.getenv("JINA_API_KEYS", "")
@staticmethod
def truncate_to_tokens(text: str, max_tokens: int = 95000) -> str:
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
if len(tokens) <= max_tokens:
return text
truncated_tokens = tokens[:max_tokens]
return encoding.decode(truncated_tokens)
OSS_JSON_FORMAT = """# Response Formats
## visit_content
{"properties":{"rational":{"type":"string","description":"Locate the **specific sections/data** directly related to the user's goal within the webpage content"},"evidence":{"type":"string","description":"Identify and extract the **most relevant information** from the content, never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.","summary":{"type":"string","description":"Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal."}}}}"""
@register_tool('visit', allow_overwrite=True)
class Visit(BaseTool):
# The `description` tells the agent the functionality of this tool.
name = 'visit'
description = 'Visit webpage(s) and return the summary of the content.'
# The `parameters` tell the agent what input parameters the tool has.
parameters = {
"type": "object",
"properties": {
"url": {
"type": ["string", "array"],
"items": {
"type": "string"
},
"minItems": 1,
"description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs."
},
"goal": {
"type": "string",
"description": "The goal of the visit for webpage(s)."
}
},
"required": ["url", "goal"]
}
# The `call` method is the main function of the tool.
def call(self, params: Union[str, dict], **kwargs) -> str:
try:
url = params["url"]
goal = params["goal"]
except:
return "[Visit] Invalid request format: Input must be a JSON object containing 'url' and 'goal' fields"
start_time = time.time()
# Create log folder if it doesn't exist
log_folder = "log"
os.makedirs(log_folder, exist_ok=True)
if isinstance(url, str):
response = self.readpage_jina(url, goal)
else:
response = []
assert isinstance(url, List)
start_time = time.time()
for u in url:
if time.time() - start_time > 900:
cur_response = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
cur_response += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
cur_response += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
try:
cur_response = self.readpage_jina(u, goal)
except Exception as e:
cur_response = f"Error fetching {u}: {str(e)}"
response.append(cur_response)
response = "\n=======\n".join(response)
print(f'Summary Length {len(response)}; Summary Content {response}')
return response.strip()
def call_server(self, msgs, max_retries=2):
api_key = os.environ.get("API_KEY")
url_llm = os.environ.get("API_BASE")
model_name = os.environ.get("SUMMARY_MODEL_NAME", "")
client = OpenAI(
api_key=api_key,
base_url=url_llm,
)
for attempt in range(max_retries):
try:
chat_response = client.chat.completions.create(
model=model_name,
messages=msgs,
temperature=0.7
)
content = chat_response.choices[0].message.content
if content:
try:
json.loads(content)
except:
# extract json from string
left = content.find('{')
right = content.rfind('}')
if left != -1 and right != -1 and left <= right:
content = content[left:right+1]
return content
except Exception as e:
# print(e)
if attempt == (max_retries - 1):
return ""
continue
def jina_readpage(self, url: str) -> str:
"""
Read webpage content using Jina service.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
max_retries = 3
timeout = 50
for attempt in range(max_retries):
headers = {
"Authorization": f"Bearer {JINA_API_KEYS}",
}
try:
response = requests.get(
f"https://r.jina.ai/{url}",
headers=headers,
timeout=timeout
)
if response.status_code == 200:
webpage_content = response.text
return webpage_content
else:
print(response.text)
raise ValueError("jina readpage error")
except Exception as e:
time.sleep(0.5)
if attempt == max_retries - 1:
return "[visit] Failed to read page."
return "[visit] Failed to read page."
def html_readpage_jina(self, url: str) -> str:
max_attempts = 8
for attempt in range(max_attempts):
content = self.jina_readpage(url)
service = "jina"
print(service)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
return content
return "[visit] Failed to read page."
def readpage_jina(self, url: str, goal: str) -> str:
"""
Attempt to read webpage content by alternating between jina and aidata services.
Args:
url: The URL to read
goal: The goal/purpose of reading the page
Returns:
str: The webpage content or error message
"""
summary_page_func = self.call_server
max_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1))
content = self.html_readpage_jina(url)
if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"):
content = truncate_to_tokens(content, max_tokens=95000)
messages = [{"role":"user","content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}]
parse_retry_times = 0
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries = 3
while len(raw) < 10 and summary_retries >= 0:
truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000
status_msg = (
f"[visit] Summary url[{url}] "
f"attempt {3 - summary_retries + 1}/3, "
f"content length: {len(content)}, "
f"truncating to {truncate_length} chars"
) if summary_retries > 0 else (
f"[visit] Summary url[{url}] failed after 3 attempts, "
f"final truncation to 25000 chars"
)
print(status_msg)
content = content[:truncate_length]
extraction_prompt = EXTRACTOR_PROMPT.format(
webpage_content=content,
goal=goal
)
messages = [{"role": "user", "content": extraction_prompt}]
raw = summary_page_func(messages, max_retries=max_retries)
summary_retries -= 1
parse_retry_times = 2
if isinstance(raw, str):
raw = raw.replace("```json", "").replace("```", "").strip()
while parse_retry_times < 3:
try:
raw = json.loads(raw)
break
except:
raw = summary_page_func(messages, max_retries=max_retries)
parse_retry_times += 1
if parse_retry_times >= 3:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n"
useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n"
if len(useful_information) < 10 and summary_retries < 0:
print("[visit] Could not generate valid summary after maximum retries")
useful_information = "[visit] Failed to read page"
return useful_information
# If no valid content was obtained after all retries
else:
useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal)
useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n"
useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n"
return useful_information