-
Notifications
You must be signed in to change notification settings - Fork 1
/
weatherman.py
90 lines (75 loc) · 2.71 KB
/
weatherman.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
from typing import Any
from autogen import UserProxyAgent
from autogen.agentchat import ConversableAgent
from fastagency import UI, FastAgency
from fastagency.api.openapi import OpenAPI
from fastagency.runtimes.autogen.autogen import AutoGenWorkflows
from fastagency.ui.mesop import MesopUI
llm_config = {
"config_list": [
{
"model": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
"api_key": os.getenv("TOGETHER_API_KEY"),
"api_type": "together",
"hide_tools": "if_any_run"
}
],
"temperature": 0.8,
}
openapi_url = "https://weather.tools.fastagency.ai/openapi.json"
weather_api = OpenAPI.create(openapi_url=openapi_url)
weather_agent_system_message = """You are a real-time weather agent. When asked
about the weather for a specific city, NEVER provide any information from
memory. ALWAYS respond with: "Please wait while I fetch the weather data for
[city name]..." and immediately call the provided function to retrieve
real-time data for that city. Be concise in your response. Only handle one
city request at a time."""
wf = AutoGenWorkflows()
@wf.register(name="simple_weather", description="Weather chat") # type: ignore[type-var]
def weather_workflow(
ui: UI, params: dict[str, Any]
) -> str:
initial_message = ui.text_input(
sender="Workflow",
recipient="User",
prompt="I can help you with the weather. What would you like to know?",
)
user_agent = UserProxyAgent(
name="User_Agent",
system_message="You are a user agent",
llm_config=llm_config,
human_input_mode="NEVER",
)
weather_agent = ConversableAgent(
name="Weather_Agent",
system_message=weather_agent_system_message,
llm_config=llm_config,
human_input_mode="NEVER",
)
wf.register_api( # type: ignore[attr-defined]
api=weather_api,
callers=[user_agent],
executors=[weather_agent],
functions=[
{
"get_daily_weather_daily_get": {
"name": "get_daily_weather",
"description": "Get the daily weather",
}
},
"get_hourly_weather_hourly_get",
],
)
chat_result = user_agent.initiate_chat(
weather_agent,
message=initial_message,
summary_method="reflection_with_llm",
max_turns=5,
)
return chat_result.summary # type: ignore[no-any-return]
app = FastAgency(provider=wf, ui=MesopUI())
# install requirements with the following command
# pip install "fastagency[autogen,mesop,openapi,fastagency,server]""
# running it with the following command
# gunicorn weatherman:app -b 0.0.0.0:8000 --reload