diff --git a/backend/account_v2/authentication_controller.py b/backend/account_v2/authentication_controller.py index 31dd284d7..2cd621424 100644 --- a/backend/account_v2/authentication_controller.py +++ b/backend/account_v2/authentication_controller.py @@ -38,6 +38,7 @@ from django.db.utils import IntegrityError from django.middleware import csrf from django.shortcuts import redirect +from logs_helper.log_service import LogService from rest_framework import status from rest_framework.request import Request from rest_framework.response import Response @@ -268,6 +269,8 @@ def make_user_organization_display_name(self, user_name: str) -> str: return self.auth_service.make_user_organization_display_name(user_name) def user_logout(self, request: Request) -> Response: + session_id: str = UserSessionUtils.get_session_id(request=request) + LogService.remove_logs_on_logout(session_id=session_id) response = self.auth_service.user_logout(request=request) organization_id = UserSessionUtils.get_organization_id(request) user_id = UserSessionUtils.get_user_id(request) diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 94db8aa22..6394f5a23 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -130,6 +130,9 @@ def get_required_setting( get_required_setting("LOG_HISTORY_CONSUMER_INTERVAL", "60") ) LOGS_BATCH_LIMIT = int(get_required_setting("LOGS_BATCH_LIMIT", "30")) +LOGS_EXPIRATION_TIME_IN_SECOND = int( + get_required_setting("LOGS_EXPIRATION_TIME_IN_SECOND") +) CELERY_BROKER_URL = get_required_setting( "CELERY_BROKER_URL", f"redis://{REDIS_HOST}:{REDIS_PORT}" ) diff --git a/backend/backend/urls_v2.py b/backend/backend/urls_v2.py index 222679092..84db146a7 100644 --- a/backend/backend/urls_v2.py +++ b/backend/backend/urls_v2.py @@ -33,6 +33,7 @@ path("api/", include("api_v2.urls")), path("usage/", include("usage_v2.urls")), path("notifications/", include("notification_v2.urls")), + path("logs/", include("logs_helper.urls")), path( UrlPathConstants.PROMPT_STUDIO, include("prompt_studio.prompt_profile_manager_v2.urls"), diff --git a/backend/logs_helper/__init__.py b/backend/logs_helper/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/logs_helper/apps.py b/backend/logs_helper/apps.py new file mode 100644 index 000000000..22d78fc06 --- /dev/null +++ b/backend/logs_helper/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class LogsHelperConfig(AppConfig): + name = "logs_helper" diff --git a/backend/logs_helper/constants.py b/backend/logs_helper/constants.py new file mode 100644 index 000000000..18de76e1f --- /dev/null +++ b/backend/logs_helper/constants.py @@ -0,0 +1,3 @@ +class LogsHelperKeys: + LOG = "LOG" + LOG_EVENTS_ID = "log_events_id" diff --git a/backend/logs_helper/log_service.py b/backend/logs_helper/log_service.py new file mode 100644 index 000000000..61a4bc797 --- /dev/null +++ b/backend/logs_helper/log_service.py @@ -0,0 +1,24 @@ +from utils.cache_service import CacheService + + +class LogService: + @staticmethod + def remove_logs_on_logout(session_id: str) -> None: + + if session_id: + key_pattern = f"{LogService.generate_redis_key(session_id=session_id)}*" + + # Delete keys matching the pattern + CacheService.clear_cache(key_pattern=key_pattern) + + @staticmethod + def generate_redis_key(session_id): + """Generate a Redis key for logs based on the provided session_id. + + Parameters: + session_id (str): The session identifier to include in the Redis key. + + Returns: + str: The constructed Redis key. + """ + return f"logs:{session_id}" diff --git a/backend/logs_helper/serializers.py b/backend/logs_helper/serializers.py new file mode 100644 index 000000000..35b2a0046 --- /dev/null +++ b/backend/logs_helper/serializers.py @@ -0,0 +1,5 @@ +from rest_framework import serializers + + +class StoreLogMessagesSerializer(serializers.Serializer): + log = serializers.CharField() diff --git a/backend/logs_helper/urls.py b/backend/logs_helper/urls.py new file mode 100644 index 000000000..68c4db3c1 --- /dev/null +++ b/backend/logs_helper/urls.py @@ -0,0 +1,16 @@ +from django.urls import path +from rest_framework.urlpatterns import format_suffix_patterns + +from .views import LogsHelperViewSet + +logs_helper = LogsHelperViewSet.as_view({"get": "get_logs", "post": "store_log"}) + +urlpatterns = format_suffix_patterns( + [ + path( + "", + logs_helper, + name="logs-helper", + ), + ] +) diff --git a/backend/logs_helper/views.py b/backend/logs_helper/views.py new file mode 100644 index 000000000..ce3f632fc --- /dev/null +++ b/backend/logs_helper/views.py @@ -0,0 +1,68 @@ +import json +import logging +from datetime import datetime, timezone + +from django.conf import settings +from django.http import HttpRequest +from rest_framework import status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response +from utils.cache_service import CacheService +from utils.user_session import UserSessionUtils + +from .log_service import LogService +from .serializers import StoreLogMessagesSerializer + +logger = logging.getLogger(__name__) + + +class LogsHelperViewSet(viewsets.ModelViewSet): + """Viewset to handle all Tool Studio prompt related API logics.""" + + @action(detail=False, methods=["get"]) + def get_logs(self, request: HttpRequest) -> Response: + # Extract the session ID + session_id: str = UserSessionUtils.get_session_id(request=request) + + # Construct the Redis key pattern to match keys + # associated with the session ID + redis_key = LogService.generate_redis_key(session_id=session_id) + + # Retrieve keys matching the pattern + keys = CacheService.get_all_keys(f"{redis_key}*") + + # Retrieve values corresponding to the keys and sort them by timestamp + logs = [] + for key in keys: + log_data = CacheService.get_key(key) + logs.append(log_data) + + # Sort logs based on timestamp + sorted_logs = sorted(logs, key=lambda x: x["timestamp"]) + + return Response({"data": sorted_logs}, status=status.HTTP_200_OK) + + # This API will be triggered whenever a notification message + # pops up in the UI. + @action(detail=False, methods=["post"]) + def store_log(self, request: HttpRequest) -> Response: + """Store log message in Redis.""" + # Extract the session ID + logs_expiry = settings.LOGS_EXPIRATION_TIME_IN_SECOND + session_id: str = UserSessionUtils.get_session_id(request=request) + + serializer = StoreLogMessagesSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + # Extract the log message from the validated data + log: str = serializer.validated_data.get("log") + log_data = json.loads(log) + timestamp = datetime.now(timezone.utc).timestamp() + + redis_key = ( + f"{LogService.generate_redis_key(session_id=session_id)}:{timestamp}" + ) + + CacheService.set_key(redis_key, log_data, logs_expiry) + + return Response({"message": "Successfully stored the message in redis"}) diff --git a/backend/sample.env b/backend/sample.env index 06ba0cfbf..66e780dda 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -124,6 +124,8 @@ ENABLE_LOG_HISTORY=True LOG_HISTORY_CONSUMER_INTERVAL=30 # Maximum number of logs to insert in a single batch. LOGS_BATCH_LIMIT=30 +# Logs Expiry of 24 hours +LOGS_EXPIRATION_TIME_IN_SECOND=86400 # Celery Configuration CELERY_BROKER_URL = "redis://unstract-redis:6379" diff --git a/backend/utils/cache_service.py b/backend/utils/cache_service.py index 691b8ee2a..82107a7fa 100644 --- a/backend/utils/cache_service.py +++ b/backend/utils/cache_service.py @@ -28,6 +28,12 @@ def set_key( expire, ) + @staticmethod + def get_all_keys(key_pattern: str) -> Any: + keys = redis_cache.keys(key_pattern) + # Ensure all keys are strings + return [key.decode("utf-8") if isinstance(key, bytes) else key for key in keys] + @staticmethod def clear_cache(key_pattern: str) -> Any: """Delete keys in bulk based on the key pattern.""" diff --git a/backend/utils/user_session.py b/backend/utils/user_session.py index c5c5a9452..72131cffc 100644 --- a/backend/utils/user_session.py +++ b/backend/utils/user_session.py @@ -29,6 +29,10 @@ def set_organization_member_role( ) -> None: request.session["role"] = member.role + @staticmethod + def get_session_id(request: HttpRequest) -> Optional[str]: + return request.session.session_key + @staticmethod def get_organization_member_role(request: HttpRequest) -> Optional[str]: return request.session.get("role") diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index f907cfbf5..4f08dac19 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -10,6 +10,7 @@ import PostHogPageviewTracker from "./PostHogPageviewTracker.js"; import { PageTitle } from "./components/widgets/page-title/PageTitle.jsx"; import { useEffect } from "react"; import CustomMarkdown from "./components/helpers/custom-markdown/CustomMarkdown.jsx"; +import { useSocketLogsStore } from "./store/socket-logs-store.js"; let GoogleTagManagerHelper; try { @@ -24,6 +25,7 @@ function App() { const { defaultAlgorithm, darkAlgorithm } = theme; const { sessionDetails } = useSessionStore(); const { alertDetails } = useAlertStore(); + const { pushLogMessages } = useSocketLogsStore(); const btn = ( <> @@ -55,6 +57,15 @@ function App() { btn, key: alertDetails?.key, }); + + pushLogMessages([ + { + timestamp: Math.floor(Date.now() / 1000), + level: alertDetails?.type ? alertDetails?.type.toUpperCase() : "", + message: alertDetails.content, + type: "NOTIFICATION", + }, + ]); }, [alertDetails]); return ( diff --git a/frontend/src/components/agency/agency/Agency.jsx b/frontend/src/components/agency/agency/Agency.jsx index 9215c996b..74fd98528 100644 --- a/frontend/src/components/agency/agency/Agency.jsx +++ b/frontend/src/components/agency/agency/Agency.jsx @@ -1,10 +1,5 @@ -import { Button, Collapse, Layout, Modal } from "antd"; -import { - FullscreenExitOutlined, - FullscreenOutlined, - LeftOutlined, - RightOutlined, -} from "@ant-design/icons"; +import { Button, Layout } from "antd"; +import { LeftOutlined, RightOutlined } from "@ant-design/icons"; import Sider from "antd/es/layout/Sider"; import { useEffect, useState } from "react"; @@ -12,17 +7,13 @@ import { IslandLayout } from "../../../layouts/island-layout/IslandLayout"; import { Actions } from "../actions/Actions"; import { WorkflowExecution } from "../workflow-execution/WorkflowExecution"; import "./Agency.css"; -import { useSocketLogsStore } from "../../../store/socket-logs-store"; import { useSocketMessagesStore } from "../../../store/socket-messages-store"; import { useWorkflowStore } from "../../../store/workflow-store"; -import { LogsLabel } from "../logs-label/LogsLabel"; import { SidePanel } from "../side-panel/SidePanel"; -import { DisplayLogs } from "../display-logs/DisplayLogs"; import { PageTitle } from "../../widgets/page-title/PageTitle"; function Agency() { const [isCollapsed, setIsCollapsed] = useState(false); - const [activeKey, setActiveKey] = useState([]); const [steps, setSteps] = useState([]); const [inputMd, setInputMd] = useState(""); const [outputMd, setOutputMd] = useState(""); @@ -30,7 +21,6 @@ function Agency() { const [sourceMsg, setSourceMsg] = useState(""); const [destinationMsg, setDestinationMsg] = useState(""); const { message, setDefault } = useSocketMessagesStore(); - const { emptyLogs } = useSocketLogsStore(); const workflowStore = useWorkflowStore(); const { details, loadingType, projectName } = workflowStore; const prompt = details?.prompt_text; @@ -38,42 +28,6 @@ function Agency() { const [prevLoadingType, setPrevLoadingType] = useState(""); const [isUpdateSteps, setIsUpdateSteps] = useState(false); const [stepLoader, setStepLoader] = useState(false); - const [showLogsModal, setShowLogsModal] = useState(false); - - const openLogsModal = () => { - setShowLogsModal(true); - }; - - const closeLogsModal = () => { - setShowLogsModal(false); - }; - - const genExtra = () => ( - { - // If you don't want click extra trigger collapse, you can prevent this: - openLogsModal(); - event.stopPropagation(); - }} - /> - ); - - const getItems = () => [ - { - key: "1", - label: activeKey?.length > 0 ? : "Logs", - children: ( -
- -
- ), - extra: genExtra(), - }, - ]; - - const handleCollapse = (keys) => { - setActiveKey(keys); - }; useEffect(() => { if (prevLoadingType !== "EXECUTE") { @@ -103,7 +57,6 @@ function Agency() { setOutputMd(""); setStatusBarMsg(""); setDefault(); - emptyLogs(); setSourceMsg(""); setDestinationMsg(""); }; @@ -112,7 +65,6 @@ function Agency() { // Clean up function to clear all the socket messages return () => { setDefault(); - emptyLogs(); }; }, []); @@ -139,7 +91,6 @@ function Agency() { } if (msgComp === "SOURCE" && state === "RUNNING") { - setActiveKey(""); setSourceMsg(""); setDestinationMsg(""); const newSteps = [...steps].map((step) => { @@ -227,31 +178,7 @@ function Agency() { stepLoader={stepLoader} /> -
- - } - > - -
- -
-
-
+
); } diff --git a/frontend/src/components/custom-tools/tool-ide/ToolIde.jsx b/frontend/src/components/custom-tools/tool-ide/ToolIde.jsx index 58083a2da..b5fc08aa7 100644 --- a/frontend/src/components/custom-tools/tool-ide/ToolIde.jsx +++ b/frontend/src/components/custom-tools/tool-ide/ToolIde.jsx @@ -1,5 +1,4 @@ -import { FullscreenExitOutlined, FullscreenOutlined } from "@ant-design/icons"; -import { Col, Collapse, Modal, Row } from "antd"; +import { Col, Row } from "antd"; import { useState, useEffect } from "react"; import { useAxiosPrivate } from "../../../hooks/useAxiosPrivate"; @@ -7,10 +6,8 @@ import { useExceptionHandler } from "../../../hooks/useExceptionHandler"; import { useAlertStore } from "../../../store/alert-store"; import { useCustomToolStore } from "../../../store/custom-tool-store"; import { useSessionStore } from "../../../store/session-store"; -import { DisplayLogs } from "../display-logs/DisplayLogs"; import { DocumentManager } from "../document-manager/DocumentManager"; import { Header } from "../header/Header"; -import { LogsLabel } from "../logs-label/LogsLabel"; import { SettingsModal } from "../settings-modal/SettingsModal"; import { ToolsMain } from "../tools-main/ToolsMain"; import "./ToolIde.css"; @@ -49,8 +46,6 @@ try { // Do nothing if plugins are not loaded. } function ToolIde() { - const [showLogsModal, setShowLogsModal] = useState(false); - const [activeKey, setActiveKey] = useState([]); const [openSettings, setOpenSettings] = useState(false); const { details, @@ -75,13 +70,6 @@ function ToolIde() { const [openShareModal, setOpenShareModal] = useState(false); const [openCloneModal, setOpenCloneModal] = useState(false); - const openLogsModal = () => { - setShowLogsModal(true); - }; - - const closeLogsModal = () => { - setShowLogsModal(false); - }; useEffect(() => { if (openShareModal) { if (shareId) { @@ -101,32 +89,6 @@ function ToolIde() { } }, [openShareModal]); - const genExtra = () => ( - { - openLogsModal(); - event.stopPropagation(); - }} - /> - ); - - const getItems = () => [ - { - key: "1", - label: activeKey?.length > 0 ? : "Logs", - children: ( -
- -
- ), - extra: genExtra(), - }, - ]; - - const handleCollapse = (keys) => { - setActiveKey(keys); - }; - const generateIndex = async (doc) => { const docId = doc?.document_id; @@ -264,31 +226,9 @@ function ToolIde() { -
- -
- } - open={showLogsModal} - onCancel={closeLogsModal} - className="agency-ide-log-modal" - footer={null} - width={1400} - closeIcon={} - > -
- -
-
+
[ - { - key: "1", - label: activeKey?.length > 0 ? : "Logs", - children: ( -
- - - -
- ), - }, - ]; - - const handleCollapse = (keys) => { - setActiveKey(keys); - }; if (isTableLoading) { return ; } @@ -56,19 +35,6 @@ function Body({ type, columns, tableData, isTableLoading, openAddModal }) { }} />
- {deploymentsStaticContent[type].isLogsRequired && ( - <> -
- - - )}
); diff --git a/frontend/src/components/helpers/socket-messages/SocketMessages.js b/frontend/src/components/helpers/socket-messages/SocketMessages.js index ce1ce2a31..aa8224aa5 100644 --- a/frontend/src/components/helpers/socket-messages/SocketMessages.js +++ b/frontend/src/components/helpers/socket-messages/SocketMessages.js @@ -1,4 +1,11 @@ -import { useContext, useEffect, useRef, useState, useCallback } from "react"; +import { + useContext, + useEffect, + useRef, + useState, + useCallback, + useMemo, +} from "react"; import throttle from "lodash/throttle"; import { SocketContext } from "../../../helpers/SocketContext"; @@ -6,11 +13,10 @@ import { useExceptionHandler } from "../../../hooks/useExceptionHandler"; import { useAlertStore } from "../../../store/alert-store"; import { useSocketLogsStore } from "../../../store/socket-logs-store"; import { useSocketMessagesStore } from "../../../store/socket-messages-store"; -import { useSocketCustomToolStore } from "../../../store/socket-custom-tool"; import { useSessionStore } from "../../../store/session-store"; import { useUsageStore } from "../../../store/usage-store"; -const THROTTLE_DELAY = 2000; // 2 seconds +const THROTTLE_DELAY = 2000; function SocketMessages() { const [logId, setLogId] = useState(""); @@ -22,105 +28,95 @@ function SocketMessages() { setPointer, } = useSocketMessagesStore(); const { pushLogMessages } = useSocketLogsStore(); - const { updateCusToolMessages } = useSocketCustomToolStore(); const { sessionDetails } = useSessionStore(); const socket = useContext(SocketContext); const { setAlertDetails } = useAlertStore(); const handleException = useExceptionHandler(); const { setLLMTokenUsage } = useUsageStore(); - // Buffer to hold the logs between throttle intervals - const psLogs = useRef([]); - const wfLogs = useRef([]); + // Buffer to hold logs between throttle intervals + const logBufferRef = useRef([]); useEffect(() => { setLogId(sessionDetails?.logEventsId || ""); }, [sessionDetails]); - // Throttled function for PS logs - const psLogsThrottledUpdate = useRef( - throttle((psLogMessages) => { - updateCusToolMessages(psLogMessages); - psLogs.current = []; - }, THROTTLE_DELAY) - ).current; - - // Throttled function for WF logs - const wfLogsThrottledUpdate = useRef( - throttle((wfLogMessages) => { - pushLogMessages(wfLogMessages); - wfLogs.current = []; - }, THROTTLE_DELAY) - ).current; - - // Clean up throttling functions on unmount + // Throttled function that batches log messages + const logMessagesThrottledUpdate = useMemo( + () => + throttle((logsBatch) => { + if (!logsBatch.length) return; + pushLogMessages(logsBatch); + logBufferRef.current = []; + }, THROTTLE_DELAY), + [pushLogMessages] + ); + + // Clean up throttling on unmount useEffect(() => { - return () => { - psLogsThrottledUpdate.cancel(); - wfLogsThrottledUpdate.cancel(); - }; - }, [psLogsThrottledUpdate, wfLogsThrottledUpdate]); + return () => logMessagesThrottledUpdate.cancel(); + }, [logMessagesThrottledUpdate]); - const handlePsLogs = useCallback( + // Batches log messages, then invokes the throttled function + const handleLogMessages = useCallback( (msg) => { - psLogs.current = [...psLogs.current, msg]; - psLogsThrottledUpdate(psLogs.current); + logBufferRef.current = [...logBufferRef.current, msg]; + logMessagesThrottledUpdate(logBufferRef.current); }, - [psLogsThrottledUpdate] + [logMessagesThrottledUpdate] ); - const handleWfLogs = useCallback( - (msg) => { - wfLogs.current = [...wfLogs.current, msg]; - wfLogsThrottledUpdate(wfLogs.current); - }, - [wfLogsThrottledUpdate] - ); - // Handle incoming socket messages - const onMessage = (data) => { - try { - let msg = data.data; - // Attempt to decode data as JSON if it's in encoded state - if (typeof msg === "string" || msg instanceof Uint8Array) { - if (typeof msg === "string") { - msg = JSON.parse(msg); - } else { - msg = JSON.parse(new TextDecoder().decode(msg)); + // Socket message handler + const onMessage = useCallback( + (data) => { + try { + let msg = data.data; + + if (typeof msg === "string" || msg instanceof Uint8Array) { + msg = + typeof msg === "string" + ? JSON.parse(msg) + : JSON.parse(new TextDecoder().decode(msg)); } + + if ( + (msg?.type === "LOG" || msg?.type === "COST") && + msg?.service !== "prompt" + ) { + msg.message = msg?.log; + handleLogMessages(msg); + } else if (msg?.type === "UPDATE") { + pushStagedMessage(msg); + } else if (msg?.type === "LOG" && msg?.service === "prompt") { + handleLogMessages(msg); + } + + if (msg?.type === "LOG" && msg?.service === "usage") { + const remainingTokens = + msg?.max_token_count_set - msg?.added_token_count; + setLLMTokenUsage(Math.max(remainingTokens, 0)); + } + } catch (err) { + setAlertDetails( + handleException(err, "Failed to process socket message") + ); } - if ( - (msg?.type === "LOG" || msg?.type === "COST") && - msg?.service !== "prompt" - ) { - msg.message = msg?.log; - handleWfLogs(msg); - } else if (msg?.type === "UPDATE") { - pushStagedMessage(msg); - } else if (msg?.type === "LOG" && msg?.service === "prompt") { - handlePsLogs(msg); - } - if (msg?.type === "LOG" && msg?.service === "usage") { - const remainingTokens = - msg?.max_token_count_set - msg?.added_token_count; - setLLMTokenUsage(Math.max(remainingTokens, 0)); - } - } catch (err) { - setAlertDetails(handleException(err, "Failed to process socket message")); - } - }; + }, + [handleLogMessages, pushStagedMessage] + ); + // Subscribe/unsubscribe to the socket channel useEffect(() => { if (!logId) return; - const logMessageChannel = `logs:${logId}`; - socket.on(logMessageChannel, onMessage); - + const channel = `logs:${logId}`; + socket.on(channel, onMessage); return () => { - // unsubscribe to the channel to stop listening the socket messages for the logId - socket.off(logMessageChannel); + socket.off(channel, onMessage); }; - }, [logId]); + }, [socket, logId, onMessage]); + // Process staged messages sequentially useEffect(() => { if (pointer > stagedMessages?.length - 1) return; @@ -130,8 +126,10 @@ function SocketMessages() { setPointer(pointer + 1); }, 0); - return () => clearTimeout(timer); // Cleanup timer on unmount - }, [stagedMessages, pointer]); + return () => clearTimeout(timer); + }, [stagedMessages, pointer, setPointer, updateMessage]); + + return null; } export { SocketMessages }; diff --git a/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.css b/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.css new file mode 100644 index 000000000..2db63dc25 --- /dev/null +++ b/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.css @@ -0,0 +1,55 @@ +.logs-container { + position: absolute; + bottom: 0; + left: 0; + right: 0; + background-color: #fff; + border-top: 1px solid #ccc; + box-shadow: 0 -2px 5px rgba(0, 0, 0, 0.1); + display: flex; + flex-direction: column; + z-index: 999; + transition: height 0.1s; +} + +.logs-handle { + height: 40px; + background-color: #eee; + border-bottom: 1px solid #ccc; + cursor: ns-resize; + user-select: none; + font-weight: bold; +} + +.logs-header-container { + height: 40px; + display: flex; + align-items: center; + justify-content: space-between; + padding: 0 10px; +} + +.logs-content { + flex: 1; + overflow: auto; +} + +.tool-logs-table .ant-table { + font-family: Consolas, "Courier New", monospace; + font-size: 12px !important; + background-color: #fff; +} + +.tool-logs-table .ant-table-thead > tr > th { + background-color: #ffffff; + font-weight: 600; +} + +.tool-logs-table .ant-table-tbody > tr.display-logs-error-bg > td { + background-color: #fff1f0; /* a light red for error logs */ +} + +.display-logs-md { + font-size: 12px; + padding-left: 5px; +} diff --git a/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.jsx b/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.jsx new file mode 100644 index 000000000..52baca3d8 --- /dev/null +++ b/frontend/src/components/logs-and-notifications/DisplayLogsAndNotifications.jsx @@ -0,0 +1,130 @@ +import { useState, useRef, useCallback, useEffect } from "react"; +import { LogsHeader } from "./LogsHeader"; +import "./DisplayLogsAndNotifications.css"; +import { LogsAndNotificationsTable } from "./LogsAndNotificationsTable"; +import { useAxiosPrivate } from "../../hooks/useAxiosPrivate"; +import { useSessionStore } from "../../store/session-store"; +import { useSocketLogsStore } from "../../store/socket-logs-store"; +import { useAlertStore } from "../../store/alert-store"; +import { useExceptionHandler } from "../../hooks/useExceptionHandler"; + +export function DisplayLogsAndNotifications() { + const [contentHeight, setContentHeight] = useState(0); + const [errorCount, setErrorCount] = useState(0); + const axiosPrivate = useAxiosPrivate(); + const { sessionDetails } = useSessionStore(); + const { pushLogMessages } = useSocketLogsStore(); + const { setAlertDetails } = useAlertStore(); + const handleException = useExceptionHandler(); + + const draggingRef = useRef(false); + const startYRef = useRef(0); + const startHeightRef = useRef(0); + const containerRef = useRef(null); + + useEffect(() => { + getLogs(); + const parent = containerRef.current?.parentElement; + if (parent) { + if (window.getComputedStyle(parent).position === "static") { + parent.style.position = "relative"; + } + parent.style.overflow = "hidden"; + } + }, []); + + const getLogs = async () => { + const requestOptions = { + method: "GET", + url: `/api/v1/unstract/${sessionDetails?.orgId}/logs/`, + headers: { + "X-CSRFToken": sessionDetails?.csrfToken, + }, + }; + + axiosPrivate(requestOptions) + .then((res) => { + const data = res?.data?.data || []; + pushLogMessages(data, false); + }) + .catch((err) => { + setAlertDetails(handleException(err)); + }); + }; + + const getParentHeight = useCallback(() => { + const parent = containerRef.current?.parentElement; + return parent ? parent.getBoundingClientRect().height : 0; + }, []); + + const minimize = useCallback(() => { + setContentHeight(0); + }, []); + + const semiExpand = useCallback(() => { + const parentHeight = getParentHeight(); + const semiHeight = 0.3 * parentHeight; + const newHeight = Math.max(0, semiHeight - 40); + setContentHeight(newHeight); + }, [getParentHeight]); + + const fullExpand = useCallback(() => { + const parentHeight = getParentHeight(); + const newHeight = Math.max(0, parentHeight - 40); + setContentHeight(newHeight); + }, [getParentHeight]); + + const onMouseDown = useCallback( + (e) => { + draggingRef.current = true; + startYRef.current = e.clientY; + startHeightRef.current = contentHeight; + document.addEventListener("mousemove", onMouseMove); + document.addEventListener("mouseup", onMouseUp); + }, + [contentHeight] + ); + + const onMouseMove = useCallback( + (e) => { + if (!draggingRef.current) return; + const diff = startYRef.current - e.clientY; + const newHeight = startHeightRef.current + diff; + const parentHeight = getParentHeight(); + const maxHeight = parentHeight - 40; + setContentHeight(Math.max(0, Math.min(maxHeight, newHeight))); + }, + [getParentHeight] + ); + + const onMouseUp = useCallback(() => { + draggingRef.current = false; + document.removeEventListener("mousemove", onMouseMove); + document.removeEventListener("mouseup", onMouseUp); + }, [onMouseMove, getParentHeight, contentHeight]); + + return ( +
+
+ +
+
+ +
+
+ ); +} diff --git a/frontend/src/components/logs-and-notifications/LogsAndNotificationsTable.jsx b/frontend/src/components/logs-and-notifications/LogsAndNotificationsTable.jsx new file mode 100644 index 000000000..61b848c18 --- /dev/null +++ b/frontend/src/components/logs-and-notifications/LogsAndNotificationsTable.jsx @@ -0,0 +1,139 @@ +import { useMemo, useEffect, useRef } from "react"; +import PropTypes from "prop-types"; +import { Table } from "antd"; +import { uniqueId } from "lodash"; +import { useSocketLogsStore } from "../../store/socket-logs-store"; +import "./DisplayLogsAndNotifications.css"; +import CustomMarkdown from "../helpers/custom-markdown/CustomMarkdown"; + +function LogsAndNotificationsTable({ errorCount, setErrorCount, isMinimized }) { + const tableRef = useRef(null); + const { logs } = useSocketLogsStore(); + + useEffect(() => { + if (!isMinimized && errorCount !== 0) { + setErrorCount(0); + } + }, [isMinimized]); + + const dataSource = useMemo(() => { + const logMessages = logs.map((log) => { + if (log?.level === "ERROR" && log?.type === "LOG" && isMinimized) { + setErrorCount((prev) => prev + 1); + } + return { + key: `${log.timestamp}-${uniqueId()}`, + time: log?.timestamp, + level: log?.level, + type: log?.type, + stage: log?.stage, + step: log?.step, + state: log?.state, + promptKey: log?.component?.prompt_key, + docName: log?.component?.doc_name, + message: ( + + ), + }; + }); + return logMessages; + }, [logs]); + + const columns = useMemo( + () => [ + { + title: "Time", + dataIndex: "time", + key: "time", + }, + { + title: "Level", + dataIndex: "level", + key: "level", + }, + { + title: "Type", + dataIndex: "type", + key: "type", + filters: [ + { text: "LOG", value: "LOG" }, + { text: "NOTIFICATION", value: "NOTIFICATION" }, + ], + defaultFilteredValue: [], + filterMultiple: true, + onFilter: (value, record) => record.type === value, + }, + { + title: "Stage", + dataIndex: "stage", + key: "stage", + }, + { + title: "Step", + dataIndex: "step", + key: "step", + }, + { + title: "State", + dataIndex: "state", + key: "state", + }, + { + title: "Prompt Key", + dataIndex: "promptKey", + key: "promptKey", + }, + { + title: "Doc Name", + dataIndex: "docName", + key: "docName", + }, + { + title: "Message", + dataIndex: "message", + key: "message", + }, + ], + [] + ); + + useEffect(() => { + if (logs?.length && tableRef.current) { + const body = tableRef.current.querySelector(".ant-table-body"); + if (body) { + body.scrollTo({ + top: body.scrollHeight, + behavior: "smooth", + }); + } + } + }, [logs]); + + const rowClassName = (record) => { + return record.level === "ERROR" ? "display-logs-error-bg" : ""; + }; + + return ( +
+ + + ); +} + +LogsAndNotificationsTable.propTypes = { + errorCount: PropTypes.number.isRequired, + setErrorCount: PropTypes.func.isRequired, + isMinimized: PropTypes.bool.isRequired, +}; + +export { LogsAndNotificationsTable }; diff --git a/frontend/src/components/logs-and-notifications/LogsHeader.jsx b/frontend/src/components/logs-and-notifications/LogsHeader.jsx new file mode 100644 index 000000000..4e43797b8 --- /dev/null +++ b/frontend/src/components/logs-and-notifications/LogsHeader.jsx @@ -0,0 +1,57 @@ +import { memo } from "react"; +import PropTypes from "prop-types"; +import { + CloseOutlined, + FullscreenOutlined, + ShrinkOutlined, +} from "@ant-design/icons"; +import { Button, Space, Tag, Typography } from "antd"; + +export const LogsHeader = memo(function LogsHeader({ + isMinimized, + errorCount, + onSemiExpand, + onFullExpand, + onMinimize, +}) { + const semiIcon = ; + const fullIcon = ; + const minimizeIcon = ; + + return ( +
+ + Logs + {isMinimized && errorCount > 0 && {errorCount}} + + +
+ ); +}); + +LogsHeader.propTypes = { + isMinimized: PropTypes.bool.isRequired, + errorCount: PropTypes.number.isRequired, + onSemiExpand: PropTypes.func.isRequired, + onFullExpand: PropTypes.func.isRequired, + onMinimize: PropTypes.func.isRequired, +}; diff --git a/frontend/src/helpers/GetStaticData.js b/frontend/src/helpers/GetStaticData.js index 1acf00a24..a0af83971 100644 --- a/frontend/src/helpers/GetStaticData.js +++ b/frontend/src/helpers/GetStaticData.js @@ -235,23 +235,19 @@ const deploymentsStaticContent = { title: "Unstructured to Structured ETL Pipelines", modalTitle: "Deploy ETL Pipeline", addBtn: "ETL Pipeline", - isLogsRequired: true, }, task: { title: "Unstructured to Structured Task Pipelines", modalTitle: "Deploy Task Pipeline", addBtn: "Task Pipeline", - isLogsRequired: true, }, api: { title: "API Deployments", addBtn: "API Deployment", - isLogsRequired: false, }, app: { title: "App Deployments", addBtn: "App Deployment", - isLogsRequired: false, }, }; @@ -279,13 +275,19 @@ const getTimeForLogs = () => { }; const getDateTimeString = (timestamp) => { - // Convert to milliseconds + // Check if the timestamp is a valid number + if (typeof timestamp !== "number" || isNaN(timestamp) || timestamp <= 0) { + return timestamp; + } + const timestampInMilliseconds = timestamp * 1000; - // Create a new Date object const date = new Date(timestampInMilliseconds); - // Extract date components + if (isNaN(date.getTime())) { + return timestamp; + } + const year = date.getFullYear(); const month = (date.getMonth() + 1).toString().padStart(2, "0"); // Months are zero-indexed const day = date.getDate().toString().padStart(2, "0"); @@ -586,6 +588,20 @@ const TRIAL_PLAN = "TRIAL"; const homePagePath = cloudHomePagePath || "tools"; +const convertTimestampToHHMMSS = (timestamp) => { + // Convert the timestamp to milliseconds + const date = new Date(timestamp * 1000); + + // Extract hours, minutes, and seconds + const [hours, minutes, seconds] = [ + date.getUTCHours(), + date.getUTCMinutes(), + date.getUTCSeconds(), + ].map((unit) => unit.toString().padStart(2, "0")); + // Return the formatted time string + return `${hours}:${minutes}:${seconds}`; +}; + const UNSTRACT_ADMIN = "unstract_admin"; export { @@ -641,5 +657,6 @@ export { generateCoverageKey, TRIAL_PLAN, homePagePath, + convertTimestampToHHMMSS, UNSTRACT_ADMIN, }; diff --git a/frontend/src/index.css b/frontend/src/index.css index 91eeef047..6d74f213e 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -127,6 +127,18 @@ body { height: 100%; } +.height-20 { + height: 20px; +} + +.height-40 { + height: 40px; +} + +.height-50 { + height: 50px; +} + .pad-right-6 { padding-right: 6px; } diff --git a/frontend/src/layouts/page-layout/PageLayout.jsx b/frontend/src/layouts/page-layout/PageLayout.jsx index e03b1ef91..1eb872398 100644 --- a/frontend/src/layouts/page-layout/PageLayout.jsx +++ b/frontend/src/layouts/page-layout/PageLayout.jsx @@ -7,6 +7,7 @@ import "./PageLayout.css"; import SideNavBar from "../../components/navigations/side-nav-bar/SideNavBar.jsx"; import { TopNavBar } from "../../components/navigations/top-nav-bar/TopNavBar.jsx"; +import { DisplayLogsAndNotifications } from "../../components/logs-and-notifications/DisplayLogsAndNotifications.jsx"; function PageLayout({ sideBarOptions, topNavBarOptions }) { const initialCollapsedValue = @@ -30,6 +31,8 @@ function PageLayout({ sideBarOptions, topNavBarOptions }) { className="collapse_btn" /> +
+
diff --git a/frontend/src/store/alert-store.js b/frontend/src/store/alert-store.js index d066368f8..0e42186b5 100644 --- a/frontend/src/store/alert-store.js +++ b/frontend/src/store/alert-store.js @@ -1,30 +1,35 @@ import { create } from "zustand"; import { isNonNegativeNumber } from "../helpers/GetStaticData"; +import { uniqueId } from "lodash"; + +const DEFAULT_DURATION = 6; const STORE_VARIABLES = { alertDetails: { type: "", content: "", title: "", - duration: undefined, + duration: DEFAULT_DURATION, key: null, }, }; + const useAlertStore = create((setState) => ({ ...STORE_VARIABLES, setAlertDetails: (details) => { - setState(() => { - if (!details) return STORE_VARIABLES; - const isErrorType = details?.type === "error"; - details["title"] = details?.title || (isErrorType ? "Failed" : "Success"); - details["duration"] = isNonNegativeNumber(details?.duration) - ? details?.duration - : isErrorType - ? 0 - : undefined; - details["key"] = `open${Date.now()}`; - return { alertDetails: { ...details } }; - }); + if (!details) return STORE_VARIABLES; + + const isErrorType = details?.type === "error"; + const updatedDetails = { + ...details, + title: details.title || (isErrorType ? "Failed" : "Success"), + duration: isNonNegativeNumber(details.duration) + ? details.duration + : DEFAULT_DURATION, + key: `open${Date.now()}-${uniqueId()}`, + }; + + setState({ alertDetails: updatedDetails }); }, })); diff --git a/frontend/src/store/socket-logs-store.js b/frontend/src/store/socket-logs-store.js index 1b78389a2..bf4cfd173 100644 --- a/frontend/src/store/socket-logs-store.js +++ b/frontend/src/store/socket-logs-store.js @@ -1,6 +1,8 @@ import { create } from "zustand"; -import { getTimeForLogs } from "../helpers/GetStaticData"; +import { useSessionStore } from "./session-store"; +import axios from "axios"; +import { getDateTimeString } from "../helpers/GetStaticData"; const STORE_VARIABLES = { logs: [], @@ -8,34 +10,56 @@ const STORE_VARIABLES = { const useSocketLogsStore = create((setState, getState) => ({ ...STORE_VARIABLES, - pushLogMessages: (messages) => { + pushLogMessages: (messages, isStoreNotifications = true) => { const existingState = { ...getState() }; + const { sessionDetails } = useSessionStore.getState(); let logsData = [...(existingState?.logs || [])]; const newLogs = messages.map((msg, index) => ({ - timestamp: getTimeForLogs(), + timestamp: getDateTimeString(msg?.timestamp), key: logsData?.length + index + 1, level: msg?.level, stage: msg?.stage, step: msg?.step, + state: msg?.state, + prompt_key: msg?.component?.prompt_key, + doc_name: msg?.component?.doc_name, message: msg?.message, - cost_type: msg?.cost_type, - cost_units: msg?.cost_units, cost_value: msg?.cost, iteration: msg?.iteration, iteration_total: msg?.iteration_total, + type: msg?.type, })); logsData = [...logsData, ...newLogs]; - // Remove the previous logs if the length exceeds 200 + newLogs.forEach((newLog) => { + if ( + newLog?.type === "NOTIFICATION" && + sessionDetails?.isLoggedIn && + isStoreNotifications + ) { + const requestOptions = { + method: "POST", + url: `/api/v1/unstract/${sessionDetails?.orgId}/logs/`, + headers: { + "X-CSRFToken": sessionDetails?.csrfToken, + }, + data: { log: JSON.stringify(newLog) }, + }; + axios(requestOptions).catch((err) => {}); + } + }); + + // Remove the previous logs if the length exceeds 1000 const logsDataLength = logsData?.length; - if (logsDataLength > 200) { - const index = logsDataLength - 200; + if (logsDataLength > 1000) { + const index = logsDataLength - 1000; logsData = logsData.slice(index); } existingState.logs = logsData; + setState(existingState); }, emptyLogs: () => { diff --git a/unstract/core/src/unstract/core/pubsub_helper.py b/unstract/core/src/unstract/core/pubsub_helper.py index dcbe6be37..0ea0a2523 100644 --- a/unstract/core/src/unstract/core/pubsub_helper.py +++ b/unstract/core/src/unstract/core/pubsub_helper.py @@ -1,16 +1,23 @@ +import json import logging import os from datetime import datetime, timezone from typing import Any, Optional +import redis from kombu import Connection from unstract.core.constants import LogEventArgument, LogProcessingTask class LogPublisher: - kombu_conn = Connection(os.environ.get("CELERY_BROKER_URL")) + r = redis.Redis( + host=os.environ.get("REDIS_HOST"), + port=os.environ.get("REDIS_PORT"), + username=os.environ.get("REDIS_USER"), + password=os.environ.get("REDIS_PASSWORD"), + ) @staticmethod def log_usage( @@ -118,8 +125,10 @@ def _get_task_header(cls, task_name: str) -> dict[str, Any]: @classmethod def publish(cls, channel_id: str, payload: dict[str, Any]) -> bool: + channel = f"logs:{channel_id}" """Publish a message to the queue.""" try: + with cls.kombu_conn.Producer(serializer="json") as producer: event = f"logs:{channel_id}" task_message = cls._get_task_message( @@ -138,6 +147,20 @@ def publish(cls, channel_id: str, payload: dict[str, Any]) -> bool: retry=True, ) logging.debug(f"Published '{channel_id}' <= {payload}") + log_data = json.dumps(payload) + # Check if the payload type is "LOG" + if payload["type"] == "LOG": + logs_expiration = os.environ.get("LOGS_EXPIRATION_TIME_IN_SECOND") + + # Extract timestamp from payload + timestamp = payload["timestamp"] + + # Construct Redis key using channel and timestamp + redis_key = f"{channel}:{timestamp}" + + # Store logs in Redis with expiration of 1 hour + cls.r.setex(redis_key, logs_expiration, log_data) + except Exception as e: logging.error(f"Failed to publish '{channel_id}' <= {payload}: {e}") return False