-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
262 lines (213 loc) · 8.79 KB
/
main.py
File metadata and controls
262 lines (213 loc) · 8.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
FastAPI-based web application for the Synthetic Data Agent.
This module provides a RESTful API and web interface for managing AI browser automation tasks.
It integrates with a Kedro pipeline to execute data generation workflows and provides endpoints
for recording management, task execution, and browser session replay.
Key Features:
- Start AI agent tasks with custom parameters (URL, retry limits, headless mode)
- List and download browser session recordings with metadata
- Replay recorded browser sessions
- Manage test scripts for automation workflows
- Serve a web dashboard for easy interaction
The application bootstraps a Kedro project on startup and manages various data directories
including recordings, metadata, test scripts, and downloads. All agent tasks are executed
asynchronously to avoid blocking the web interface.
Environment Configuration:
- Loads environment variables from .env file if present
- Configurable port (default: 8000)
- Structured data directories under the project root
API Endpoints:
- GET /: Serves the main dashboard HTML page
- POST /start-agent: Initiates agent pipeline execution
- GET /recordings: Lists recorded sessions with pagination
- POST /replay: Replays a specific recording
- GET /download/{filename}: Downloads recording as ZIP with metadata
- GET /test-scripts: Lists available test scripts
- GET /test-scripts/{filename}: Downloads specific test script
Usage:
Run directly: python main.py
Or with uvicorn: uvicorn main:app --reload
"""
# Standard library imports
import asyncio
import json
import os
from pathlib import Path
from typing import Optional
from zipfile import ZipFile
# Third-party imports
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from pydantic import BaseModel
# Kedro imports
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
# Local application imports
import synthetic_data_agent as sda
# Bootstrap the Kedro project once at startup
project_path = Path(__file__).parent.resolve()
dotenv_path = project_path / ".env"
if dotenv_path.exists():
load_dotenv(dotenv_path=dotenv_path)
else:
# This warning helps in debugging if the file is ever moved or missing
print(f"Warning: .env file not found at {dotenv_path}")
bootstrap_project(project_path)
app = FastAPI()
# --- Configuration ---
# Define paths relative to the project root for the UI to list files
templates_dir = project_path / "templates"
recordings_dir = project_path / "data/08_reporting/recordings"
metadata_dir = project_path / "data/08_reporting/metadata"
test_scripts_dir = project_path / "data/01_raw/test_scripts"
# Ensure directories exist
templates_dir.mkdir(exist_ok=True)
# --- Pydantic Models ---
class AgentTask(BaseModel):
task: str
url: str
maxRetries: int
mode: str
headless: bool
scriptName: Optional[str] = None
class ReplayTask(BaseModel):
filename: str
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def get_home_page():
"""Serves the home page HTML."""
html_path = templates_dir / "index.html"
if not html_path.exists():
raise HTTPException(
status_code=404,
detail="index.html not found in templates directory. Please create it.",
)
return HTMLResponse(content=html_path.read_text(), status_code=200)
@app.post("/start-agent")
async def start_agent(agent_task: AgentTask):
"""Starts the agent pipeline with the provided task."""
print(f'Received task: "{agent_task.task}" for URL: {agent_task.url}')
async def run_kedro_pipeline():
runtime_params_dict = {"agent_params": agent_task.model_dump()}
with KedroSession.create(
project_path=project_path, runtime_params=runtime_params_dict
) as session:
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
session.run,
"data_generation",
)
print("Kedro pipeline run completed successfully.")
except Exception as e:
print(f"Kedro pipeline run failed: {e}")
asyncio.create_task(run_kedro_pipeline())
return JSONResponse(status_code=202, content={"message": "Agent pipeline started."})
@app.get("/test-scripts")
async def get_test_scripts():
"""Fetches the list of available test scripts."""
try:
files = [f for f in os.listdir(test_scripts_dir) if f.endswith(".json")]
return JSONResponse(content=files)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to read test scripts directory: {e}"
)
@app.get("/test-scripts/{filename}")
async def get_test_script(filename: str):
script_path = test_scripts_dir / filename
if not script_path.exists():
raise HTTPException(status_code=404, detail="Script not found.")
return FileResponse(script_path)
@app.get("/recordings")
async def get_recordings(page: int = 1):
"""Fetches the list of recordings with metadata."""
try:
all_recording_files = [
f for f in os.listdir(recordings_dir) if f.endswith(".vbrec")
]
all_recording_files.sort(
key=lambda f: (recordings_dir / f).stat().st_mtime, reverse=True
)
recordings_with_meta = []
for recording_file in all_recording_files:
base_filename = Path(recording_file).stem
metadata_file = metadata_dir / f"{base_filename}.json"
# Ensure the corresponding meta file actually exists.
if metadata_file.exists():
with open(metadata_file, "r", encoding="utf-8") as f:
info = json.load(f)
info.setdefault(
"similarityScore", 0
) # Default score if not present
recordings_with_meta.append({"filename": recording_file, **info})
limit = 10
total_pages = (len(recordings_with_meta) + limit - 1) // limit
start_index = (page - 1) * limit
return {
"recordings": recordings_with_meta[start_index : start_index + limit],
"totalPages": total_pages,
"currentPage": page,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to read recordings directory: {e}"
)
@app.post("/replay")
async def replay_session(replay_task: ReplayTask):
"""Starts a replay of the specified recording."""
file_path = recordings_dir / replay_task.filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Recording not found.")
print(f"Starting replay for: {replay_task.filename}")
async def run_replay_in_background():
try:
replayer = sda.AIAgentBrowserReplay(str(file_path))
await replayer.replay()
except Exception as e:
print(f'Replay failed for "{replay_task.filename}": {e}')
asyncio.create_task(run_replay_in_background())
return JSONResponse(
content={"message": "Replay started. A new browser window should open."}
)
@app.get("/download/{filename}")
async def download_recording(filename: str):
base_filename = Path(filename).stem
recording_path = recordings_dir / filename
info_path = metadata_dir / f"{base_filename}.json"
if not recording_path.exists():
raise HTTPException(status_code=404, detail="Recording not found.")
zip_path = recordings_dir / f"{base_filename}.zip"
with ZipFile(zip_path, "w") as zipf:
zipf.write(recording_path, arcname=filename)
if info_path.exists():
with open(info_path, "r", encoding="utf-8") as f:
info_json = json.load(f)
metadata_content = []
# Ensure all keys are present
keys_to_include = [
"task",
"url",
"timestamp",
"maxRetries",
"summary",
"similarityScore",
"reasoning",
]
for key in keys_to_include:
value = info_json.get(key, "N/A")
metadata_content.append(
f"{key.replace('_', ' ').title()}: {value}"
)
zipf.writestr("agent_metadata.txt", "\n".join(metadata_content))
return FileResponse(
zip_path, media_type="application/zip", filename=f"{base_filename}.zip"
)
if __name__ == "__main__":
PORT = 8000
print(f"\nAI Browser Agent Dashboard running at http://localhost:{PORT}")
print("Open this URL in your browser to start.")
uvicorn.run(app, host="0.0.0.0", port=PORT)