Skip to content

Commit

Permalink
FEAT: Unified Notifications Feature Implementation (#1089)
Browse files Browse the repository at this point in the history
* Backend changes to support logs storage in redis

* Implemented logs_helper plugin

* UI changes to support unified notifications

* Fixed sonar issues

* Clear the logs message on logout

* UX Improvements

* Add comment to describe the 'store_log' method

---------

Signed-off-by: Tahier Hussain <[email protected]>
  • Loading branch information
tahierhussain authored Mar 4, 2025
1 parent fd067c9 commit c7f0ecf
Show file tree
Hide file tree
Showing 28 changed files with 727 additions and 280 deletions.
3 changes: 3 additions & 0 deletions backend/account_v2/authentication_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from django.db.utils import IntegrityError
from django.middleware import csrf
from django.shortcuts import redirect
from logs_helper.log_service import LogService
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
Expand Down Expand Up @@ -268,6 +269,8 @@ def make_user_organization_display_name(self, user_name: str) -> str:
return self.auth_service.make_user_organization_display_name(user_name)

def user_logout(self, request: Request) -> Response:
session_id: str = UserSessionUtils.get_session_id(request=request)
LogService.remove_logs_on_logout(session_id=session_id)
response = self.auth_service.user_logout(request=request)
organization_id = UserSessionUtils.get_organization_id(request)
user_id = UserSessionUtils.get_user_id(request)
Expand Down
3 changes: 3 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def get_required_setting(
get_required_setting("LOG_HISTORY_CONSUMER_INTERVAL", "60")
)
LOGS_BATCH_LIMIT = int(get_required_setting("LOGS_BATCH_LIMIT", "30"))
LOGS_EXPIRATION_TIME_IN_SECOND = int(
get_required_setting("LOGS_EXPIRATION_TIME_IN_SECOND")
)
CELERY_BROKER_URL = get_required_setting(
"CELERY_BROKER_URL", f"redis://{REDIS_HOST}:{REDIS_PORT}"
)
Expand Down
1 change: 1 addition & 0 deletions backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
path("api/", include("api_v2.urls")),
path("usage/", include("usage_v2.urls")),
path("notifications/", include("notification_v2.urls")),
path("logs/", include("logs_helper.urls")),
path(
UrlPathConstants.PROMPT_STUDIO,
include("prompt_studio.prompt_profile_manager_v2.urls"),
Expand Down
Empty file added backend/logs_helper/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions backend/logs_helper/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class LogsHelperConfig(AppConfig):
name = "logs_helper"
3 changes: 3 additions & 0 deletions backend/logs_helper/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class LogsHelperKeys:
LOG = "LOG"
LOG_EVENTS_ID = "log_events_id"
24 changes: 24 additions & 0 deletions backend/logs_helper/log_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from utils.cache_service import CacheService


class LogService:
@staticmethod
def remove_logs_on_logout(session_id: str) -> None:

if session_id:
key_pattern = f"{LogService.generate_redis_key(session_id=session_id)}*"

# Delete keys matching the pattern
CacheService.clear_cache(key_pattern=key_pattern)

@staticmethod
def generate_redis_key(session_id):
"""Generate a Redis key for logs based on the provided session_id.
Parameters:
session_id (str): The session identifier to include in the Redis key.
Returns:
str: The constructed Redis key.
"""
return f"logs:{session_id}"
5 changes: 5 additions & 0 deletions backend/logs_helper/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from rest_framework import serializers


class StoreLogMessagesSerializer(serializers.Serializer):
log = serializers.CharField()
16 changes: 16 additions & 0 deletions backend/logs_helper/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from django.urls import path
from rest_framework.urlpatterns import format_suffix_patterns

from .views import LogsHelperViewSet

logs_helper = LogsHelperViewSet.as_view({"get": "get_logs", "post": "store_log"})

urlpatterns = format_suffix_patterns(
[
path(
"",
logs_helper,
name="logs-helper",
),
]
)
68 changes: 68 additions & 0 deletions backend/logs_helper/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import json
import logging
from datetime import datetime, timezone

from django.conf import settings
from django.http import HttpRequest
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from utils.cache_service import CacheService
from utils.user_session import UserSessionUtils

from .log_service import LogService
from .serializers import StoreLogMessagesSerializer

logger = logging.getLogger(__name__)


class LogsHelperViewSet(viewsets.ModelViewSet):
"""Viewset to handle all Tool Studio prompt related API logics."""

@action(detail=False, methods=["get"])
def get_logs(self, request: HttpRequest) -> Response:
# Extract the session ID
session_id: str = UserSessionUtils.get_session_id(request=request)

# Construct the Redis key pattern to match keys
# associated with the session ID
redis_key = LogService.generate_redis_key(session_id=session_id)

# Retrieve keys matching the pattern
keys = CacheService.get_all_keys(f"{redis_key}*")

# Retrieve values corresponding to the keys and sort them by timestamp
logs = []
for key in keys:
log_data = CacheService.get_key(key)
logs.append(log_data)

# Sort logs based on timestamp
sorted_logs = sorted(logs, key=lambda x: x["timestamp"])

return Response({"data": sorted_logs}, status=status.HTTP_200_OK)

# This API will be triggered whenever a notification message
# pops up in the UI.
@action(detail=False, methods=["post"])
def store_log(self, request: HttpRequest) -> Response:
"""Store log message in Redis."""
# Extract the session ID
logs_expiry = settings.LOGS_EXPIRATION_TIME_IN_SECOND
session_id: str = UserSessionUtils.get_session_id(request=request)

serializer = StoreLogMessagesSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

# Extract the log message from the validated data
log: str = serializer.validated_data.get("log")
log_data = json.loads(log)
timestamp = datetime.now(timezone.utc).timestamp()

redis_key = (
f"{LogService.generate_redis_key(session_id=session_id)}:{timestamp}"
)

CacheService.set_key(redis_key, log_data, logs_expiry)

return Response({"message": "Successfully stored the message in redis"})
2 changes: 2 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ ENABLE_LOG_HISTORY=True
LOG_HISTORY_CONSUMER_INTERVAL=30
# Maximum number of logs to insert in a single batch.
LOGS_BATCH_LIMIT=30
# Logs Expiry of 24 hours
LOGS_EXPIRATION_TIME_IN_SECOND=86400

# Celery Configuration
CELERY_BROKER_URL = "redis://unstract-redis:6379"
Expand Down
6 changes: 6 additions & 0 deletions backend/utils/cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def set_key(
expire,
)

@staticmethod
def get_all_keys(key_pattern: str) -> Any:
keys = redis_cache.keys(key_pattern)
# Ensure all keys are strings
return [key.decode("utf-8") if isinstance(key, bytes) else key for key in keys]

@staticmethod
def clear_cache(key_pattern: str) -> Any:
"""Delete keys in bulk based on the key pattern."""
Expand Down
4 changes: 4 additions & 0 deletions backend/utils/user_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def set_organization_member_role(
) -> None:
request.session["role"] = member.role

@staticmethod
def get_session_id(request: HttpRequest) -> Optional[str]:
return request.session.session_key

@staticmethod
def get_organization_member_role(request: HttpRequest) -> Optional[str]:
return request.session.get("role")
Expand Down
11 changes: 11 additions & 0 deletions frontend/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import PostHogPageviewTracker from "./PostHogPageviewTracker.js";
import { PageTitle } from "./components/widgets/page-title/PageTitle.jsx";
import { useEffect } from "react";
import CustomMarkdown from "./components/helpers/custom-markdown/CustomMarkdown.jsx";
import { useSocketLogsStore } from "./store/socket-logs-store.js";

let GoogleTagManagerHelper;
try {
Expand All @@ -24,6 +25,7 @@ function App() {
const { defaultAlgorithm, darkAlgorithm } = theme;
const { sessionDetails } = useSessionStore();
const { alertDetails } = useAlertStore();
const { pushLogMessages } = useSocketLogsStore();

const btn = (
<>
Expand Down Expand Up @@ -55,6 +57,15 @@ function App() {
btn,
key: alertDetails?.key,
});

pushLogMessages([
{
timestamp: Math.floor(Date.now() / 1000),
level: alertDetails?.type ? alertDetails?.type.toUpperCase() : "",
message: alertDetails.content,
type: "NOTIFICATION",
},
]);
}, [alertDetails]);

return (
Expand Down
79 changes: 3 additions & 76 deletions frontend/src/components/agency/agency/Agency.jsx
Original file line number Diff line number Diff line change
@@ -1,79 +1,33 @@
import { Button, Collapse, Layout, Modal } from "antd";
import {
FullscreenExitOutlined,
FullscreenOutlined,
LeftOutlined,
RightOutlined,
} from "@ant-design/icons";
import { Button, Layout } from "antd";
import { LeftOutlined, RightOutlined } from "@ant-design/icons";
import Sider from "antd/es/layout/Sider";
import { useEffect, useState } from "react";

import { IslandLayout } from "../../../layouts/island-layout/IslandLayout";
import { Actions } from "../actions/Actions";
import { WorkflowExecution } from "../workflow-execution/WorkflowExecution";
import "./Agency.css";
import { useSocketLogsStore } from "../../../store/socket-logs-store";
import { useSocketMessagesStore } from "../../../store/socket-messages-store";
import { useWorkflowStore } from "../../../store/workflow-store";
import { LogsLabel } from "../logs-label/LogsLabel";
import { SidePanel } from "../side-panel/SidePanel";
import { DisplayLogs } from "../display-logs/DisplayLogs";
import { PageTitle } from "../../widgets/page-title/PageTitle";

function Agency() {
const [isCollapsed, setIsCollapsed] = useState(false);
const [activeKey, setActiveKey] = useState([]);
const [steps, setSteps] = useState([]);
const [inputMd, setInputMd] = useState("");
const [outputMd, setOutputMd] = useState("");
const [statusBarMsg, setStatusBarMsg] = useState("");
const [sourceMsg, setSourceMsg] = useState("");
const [destinationMsg, setDestinationMsg] = useState("");
const { message, setDefault } = useSocketMessagesStore();
const { emptyLogs } = useSocketLogsStore();
const workflowStore = useWorkflowStore();
const { details, loadingType, projectName } = workflowStore;
const prompt = details?.prompt_text;
const [activeToolId, setActiveToolId] = useState("");
const [prevLoadingType, setPrevLoadingType] = useState("");
const [isUpdateSteps, setIsUpdateSteps] = useState(false);
const [stepLoader, setStepLoader] = useState(false);
const [showLogsModal, setShowLogsModal] = useState(false);

const openLogsModal = () => {
setShowLogsModal(true);
};

const closeLogsModal = () => {
setShowLogsModal(false);
};

const genExtra = () => (
<FullscreenOutlined
onClick={(event) => {
// If you don't want click extra trigger collapse, you can prevent this:
openLogsModal();
event.stopPropagation();
}}
/>
);

const getItems = () => [
{
key: "1",
label: activeKey?.length > 0 ? <LogsLabel /> : "Logs",
children: (
<div className="agency-ide-logs">
<DisplayLogs />
</div>
),
extra: genExtra(),
},
];

const handleCollapse = (keys) => {
setActiveKey(keys);
};

useEffect(() => {
if (prevLoadingType !== "EXECUTE") {
Expand Down Expand Up @@ -103,7 +57,6 @@ function Agency() {
setOutputMd("");
setStatusBarMsg("");
setDefault();
emptyLogs();
setSourceMsg("");
setDestinationMsg("");
};
Expand All @@ -112,7 +65,6 @@ function Agency() {
// Clean up function to clear all the socket messages
return () => {
setDefault();
emptyLogs();
};
}, []);

Expand All @@ -139,7 +91,6 @@ function Agency() {
}

if (msgComp === "SOURCE" && state === "RUNNING") {
setActiveKey("");
setSourceMsg("");
setDestinationMsg("");
const newSteps = [...steps].map((step) => {
Expand Down Expand Up @@ -227,31 +178,7 @@ function Agency() {
stepLoader={stepLoader}
/>
</div>
<div className="agency-footer">
<Collapse
className="agency-ide-collapse-panel"
size="small"
activeKey={activeKey}
items={getItems()}
expandIconPosition="end"
onChange={handleCollapse}
bordered={false}
/>
<Modal
title="Logs"
open={showLogsModal}
onCancel={closeLogsModal}
className="agency-ide-log-modal"
footer={null}
width={1000}
closeIcon={<FullscreenExitOutlined />}
>
<LogsLabel />
<div className="agency-ide-logs">
<DisplayLogs />
</div>
</Modal>
</div>
<div className="height-20" />
</div>
);
}
Expand Down
Loading

0 comments on commit c7f0ecf

Please sign in to comment.