Skip to content

Commit

Permalink
Merge branch 'feature/integration' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
peterxcli committed Jan 26, 2024
2 parents ab60b27 + f2952ac commit 46299e5
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 98 deletions.
25 changes: 8 additions & 17 deletions bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,21 @@ async def on_ready(self):
print(f"Logged in as {self.client.user}")

async def on_message(self, message):
print("on_message")
if message.author == self.client.user:
return

for thread in self.discord_thread_manager.get_active_threads():
if message.channel.id == thread.id:
await process_feedback(message, thread)

def feedback_retrieval(message, thread_id):
# TODO: Peter這邊 feedback_retrival(message, thread_id) 這裡面要做 store 的動作
return "AI response"
await self.process_feedback(message, thread)

async def process_feedback(self, message, thread):
print(f"收到feedback:{message.content}")
await thread.send(
"Feedback received ! Thanks for your feedback, we will use this to improve our message!"
ai_response = self.monitor_runner.get_agent_response(
str(thread.id),
message.content,
)
print(message.content)
print(thread.id)
# Peter這邊 feedback_retrival(message.content, thread.id)
ai_response = self.monitor_runner.get_agent_response(message.content, thread.id)
await thread.send(ai_response)

async def send_alert(self, message_dict: dict):
print("broadcasting")
# test_channel_id = self.token
channel = self.client.get_channel(self.channel_id)
if channel:
try:
Expand All @@ -74,6 +63,9 @@ def run(self):
self.client.event(self.on_message)
self.client.run(self.token)

async def cleanup(self):
await self.client.close()


load_dotenv()
DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
Expand All @@ -96,12 +88,11 @@ def run(self):
"memory": 0,
"instance": 1,
"message": "The application is experiencing high latency and is unable to keep up with the demand. The number of tasks in the queue has been above 100 for the past 5 minutes and the average task execution time has been above 30 seconds. I recommend increasing the number of instances by 1.",
"timestamp": "2024-01-26 11:05:49+00:00"
"timestamp": "2024-01-26 11:05:49+00:00",
# 'metric_dataframe': pd.DataFrame
}



async def update_active_threads():
global active_threads
active_threads = await get_active_threads()
Expand Down
6 changes: 2 additions & 4 deletions bot/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datetime import datetime

import discord


Expand All @@ -24,8 +22,8 @@ async def send_embedded_message(channel: discord.channel, info: dict):
)

# Set the timestamp
timestamp_format = "%Y-%m-%d %H:%M:%S%z"
parsed_timestamp = datetime.strptime(info["timestamp"], timestamp_format)
# timestamp_format = "%Y-%m-%d %H:%M:%S%z"
parsed_timestamp = info["timestamp"]
embed.timestamp = parsed_timestamp

# Set the footer
Expand Down
50 changes: 43 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
import asyncio
import os
import threading

import discord
from dotenv import load_dotenv

from ai2.log_analyzer import LLMLogAnalyzer
from bot.bot import DiscordBot
from bot.thread import DiscordThreadManager
from monitor.monitor_runner import FakeChatAI, MonitorRunner
from monitor.monitor_runner import MonitorRunner
from monitor.service.cloudrun import CloudRunManager


class StoppableThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""

def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()


if __name__ == "__main__":
load_dotenv()
discord_bot_token = os.getenv("DISCORD_BOT_TOKEN")
discord_dst_channel_id = int(os.getenv("DISCORD_DST_CHANNEL_ID"))
pinecone_api_key = os.getenv("PINECONE_API_KEY")
service_name = "consumer-latest"

# init chat agent
fake_chat_ai = FakeChatAI()

# init discord client
intents = discord.Intents.default()
intents.messages = True
Expand All @@ -29,9 +45,20 @@
# init cloudrun manager
cloudrun_manager = CloudRunManager()

# inti llm log analyzer
llm_args = {
"model_name": "text-bison@001",
"max_output_tokens": 1024,
"temperature": 0,
}
index_name = "tsmc-hackathon"
llm_log_analyzer = LLMLogAnalyzer(
pinecone_api_key=pinecone_api_key, index_name=index_name, llm_args=llm_args
)

client = discord.Client(intents=intents)
monitor_runner = MonitorRunner(
chat_agent=fake_chat_ai,
llm_log_analyzer=llm_log_analyzer,
client=client,
channel_id=discord_dst_channel_id,
discord_thread_manager=discord_thread_manager,
Expand All @@ -47,7 +74,16 @@
monitor_runner=monitor_runner,
)

discord_thread = threading.Thread(target=discord_bot.run)
discord_thread = StoppableThread(target=discord_bot.run)
discord_thread.start()

monitor_runner.run()
try:
asyncio.run(monitor_runner.run())
except KeyboardInterrupt:
print("\nshutting down...")
except Exception as e:
print(e)
exit(1)
discord_thread.stop()
asyncio.run_coroutine_threadsafe(discord_bot.cleanup(), discord_bot.client.loop)
discord_thread.join(2)
161 changes: 92 additions & 69 deletions monitor/monitor_runner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import time

import discord
import pandas as pd

from ai2.log_analyzer import LLMLogAnalyzer

# from bot.bot import cun_bot, send_alert
from bot.message import send_embedded_message
from bot.thread import DiscordThreadManager
Expand Down Expand Up @@ -41,45 +42,27 @@ def parser(line: str):
return df


class FakeChatAI:
# def __init__(self):
# self.chatbot = conversation_manager.ConversationManager()
def get_response(self, conversion_id: str, user_message: str):
"""_summary_
Args:
conversion_id (str): the id of the discord thread
user_message (str): user's new input
Returns:
_type_: _description_
"""
a = "fake response:" + user_message
return a


class MonitorRunner:
def __init__(
self,
chat_agent: FakeChatAI,
llm_log_analyzer: LLMLogAnalyzer,
client: discord.Client,
channel_id: int,
discord_thread_manager: DiscordThreadManager,
target_service_name="consumer-latest",
cloudrun_manager: CloudRunManager = CloudRunManager(),
):
self.chat_agent = chat_agent
self.llm_log_analyzer = llm_log_analyzer
self.discord_client = client
self.channel_id = channel_id
self.cloudrun_manager = cloudrun_manager
self.discord_thread_manager = discord_thread_manager
self.target_service_name = target_service_name

def get_agent_response(self, conversion_id: str, user_message: str):
return self.chat_agent.get_response(conversion_id, user_message)
return self.llm_log_analyzer.chat(conversion_id, user_message)

async def send_alert(self, message_dict: dict):
print("broadcasting")
test_channel_id = self.channel_id
channel = self.discord_client.get_channel(test_channel_id)
if channel:
Expand All @@ -95,58 +78,98 @@ async def create_thread(self, message: discord.Message):
await thread.send("Send me message if you have any suggestion!")
# 將討論串加入 active_threads 之中
self.discord_thread_manager.add_thread(thread)
# print(active_threads)
return thread.id

def run(self):
# bot_thread = threading.Thread(target=run_bot)
# bot_thread.start()
while True:
print("fetching...")

log_df: pd.DataFrame = pd.DataFrame()
for log_line in log.tail_log_entry(
service_name=self.target_service_name, max_results=100
):
parse_df = parser(log_line)
log_df = pd.concat([log_df, parse_df])
def fetch_and_process_logs(self):
log_df = pd.DataFrame()
# Assume log.tail_log_entry is an async function
for log_line in log.tail_log_entry(self.target_service_name, max_results=10):
parse_df = parser(log_line)
log_df = pd.concat([log_df, parse_df])
return log_df

def fetch_and_merge_metrics(self, log_df: pd.DataFrame):
min_time = int(log_df["Time"].min().timestamp())
max_time = int(log_df["Time"].max().timestamp())
metrics_df = self.cloudrun_manager.get_metrics(
self.target_service_name, min_time, max_time
)
metrics_df["Time"] = pd.to_datetime(metrics_df["Time"])
log_df["Time"] = pd.to_datetime(log_df["Time"])
merged_df = pd.merge(log_df, metrics_df, on="Time", how="inner")
merged_df.sort_values("Time", inplace=True)
return merged_df

async def analyze_and_alert(self, merged_df: pd.DataFrame):
"""feed the merged dataframe to the log analyzer and send alert if needed
# get the min and max time
min_time = int(log_df["Time"].min().timestamp())
max_time = int(log_df["Time"].max().timestamp())
Args:
merged_df (pd.DataFrame): merged dataframe of log and metrics
metrics_df: pd.DataFrame = self.cloudrun_manager.get_metrics(
self.target_service_name, min_time, max_time
)
metrics_df["Time"] = pd.to_datetime(metrics_df["Time"])
log_df["Time"] = pd.to_datetime(log_df["Time"])
merged_df = pd.merge(log_df, metrics_df, on="Time", how="inner")
merged_df.sort_values("Time", inplace=True)

# response = analyze_by_llm(merged_df)
response = { # TODO: substitute with analyze_by_llm
"severity": "ERROR",
"cpu": -1,
"memory": 0,
"instance": 1,
"message": "The application is experiencing high latency and is unable to keep up with the demand. The number of tasks in the queue has been above 100 for the past 5 minutes and the average task execution time has been above 30 seconds. I recommend increasing the number of instances by 1.",
"timestamp": "2024-01-26 11:05:49+00:00",
# 'metric_dataframe': pd.DataFrame
}

# if response["need_alert"]: # TODO: add alert condition
Returns:
dict: keys: (['severity', 'cpu', 'memory', 'instance', 'message', 'prompt', 'metric_dataframe', 'timestamp'])
"""
response = self.llm_log_analyzer.analyze_log(merged_df)
if True: # TODO: Replace with actual alert condition
thread_id = asyncio.run_coroutine_threadsafe(
self.send_alert(message_dict=response), self.discord_client.loop
).result()
self.llm_log_analyzer.store_memory(
id=str(thread_id),
log_df=merged_df,
initial_prompt=response["prompt"],
response=response["message"],
)
return response

print("new thread id created: ", thread_id)

# if response["instance"] != 0:
# self.cloudrun_manager.increase_instance_count(
# self.target_service_name, response["instance"]
# )
# if response["cpu"] != 0 or response["ram"] != 0:
# self.cloudrun_manager.increase_cpu_ram(
# self.target_service_name, response["cpu"], response["ram"]
# )
time.sleep(60)
async def update_service_configuration(self, response: dict):
changed = False
if response["instance"] != 0:
changed = changed or await self.cloudrun_manager.increase_instance_count(
self.target_service_name, response["instance"]
)
if response["cpu"] != 0 or response["memory"] != 0:
changed = changed and await self.cloudrun_manager.increase_cpu_ram(
self.target_service_name, response["cpu"], response["memory"]
)
if changed:
service = await self.cloudrun_manager.get_service(self.target_service_name)
print(
"new resource configuration:",
{
"cpu": service.template.containers[0].resources.limits["cpu"],
"memory": service.template.containers[0].resources.limits["memory"],
"instance": service.template.scaling.min_instance_count,
},
)

async def periodic_monitor_task(self):
print("fetching...")
try:
log_df = self.fetch_and_process_logs()
except Exception as e:
print("Error fetching logs:", e)
return

try:
merged_df = self.fetch_and_merge_metrics(log_df)
except Exception as e:
print("Error fetching metrics:", e)
return

try:
response = await self.analyze_and_alert(merged_df)
except Exception as e:
print("Error analyzing log:", e)
return

try:
await self.update_service_configuration(response)
except Exception as e:
print("Error updating service configuration:", e)
return

async def run(self):
while True:
await self.periodic_monitor_task()
await asyncio.sleep(60)
5 changes: 4 additions & 1 deletion template.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
REDIS_HOST=our_redis_host
REDIS_PORT=6379
REDIS_PASSWORD=our_redis_password
REDIS_PASSWORD=our_redis_password
DISCORD_BOT_TOKEN="11213w2323232323"
DISCORD_DST_CHANNEL_ID=3233333333333333
PINECONE_API_KEY=e5xxxxfc5-cxx5-49xx8-bc3c-28xxxxxxxxx54607

0 comments on commit 46299e5

Please sign in to comment.