diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index bbc12ba6cc..b4bfaa2b44 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -887,6 +887,8 @@ def start_webui( "ClientDir": str(container_webui_dir / "client"), "LogViewerDir": str(container_webui_dir / "yscope-log-viewer"), "StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size, + "ClpStorageEngine": clp_config.package.storage_engine, + "LsRoot": str(container_clp_config.logs_input.directory) } container_cmd_extra_opts = [] @@ -941,6 +943,7 @@ def start_webui( ] necessary_mounts = [ mounts.clp_home, + mounts.input_logs_dir ] if StorageType.S3 == stream_storage.type: auth = stream_storage.s3_config.aws_authentication diff --git a/components/webui/client/src/api/compress/index.ts b/components/webui/client/src/api/compress/index.ts new file mode 100644 index 0000000000..64c407abf6 --- /dev/null +++ b/components/webui/client/src/api/compress/index.ts @@ -0,0 +1,31 @@ +import axios, {AxiosResponse} from "axios"; + + +type CompressionJobCreationSchema = { + paths: string[]; + dataset?: string; + timestampKey?: string; +}; + +type CompressionJobSchema = { + jobId: number; +}; + +/** + * Submits a compression job. + * + * @param payload + * @return + */ +const submitCompressionJob = async (payload: CompressionJobCreationSchema): Promise => { + console.log("Submitting compression job:", JSON.stringify(payload)); + + const response = await axios.post("/api/compress", payload); + return response.data.jobId; +}; + +export type { + CompressionJobCreationSchema, + CompressionJobSchema, +}; +export {submitCompressionJob}; diff --git a/components/webui/client/src/api/os/index.ts b/components/webui/client/src/api/os/index.ts new file mode 100644 index 0000000000..bc97e573f2 --- /dev/null +++ b/components/webui/client/src/api/os/index.ts @@ -0,0 +1,31 @@ +import { + Static, + Type, +} from "@sinclair/typebox"; +import axios, {AxiosResponse} from "axios"; + + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const FileListResponseSchema = Type.Array( + Type.Object({ + isExpandable: Type.Boolean(), + name: Type.String(), + parentPath: Type.String(), + }) +); + + +type FileListResponseSchemaType = Static; + + +/** + * Lists files and directories at the specified path. + * + * @param path The path to list files for + * @return + */ +const listFiles = async (path: string): Promise> => { + return axios.get(`/api/os/ls?path=${encodeURIComponent(path)}`); +}; + +export {listFiles}; diff --git a/components/webui/client/src/pages/IngestPage/Compress/index.tsx b/components/webui/client/src/pages/IngestPage/Compress/index.tsx new file mode 100644 index 0000000000..a5174af0d2 --- /dev/null +++ b/components/webui/client/src/pages/IngestPage/Compress/index.tsx @@ -0,0 +1,345 @@ +import { + useCallback, + useState, +} from "react"; + +import { + MinusOutlined, + PlusOutlined, +} from "@ant-design/icons"; +import { + Button, + Empty, + Form, + GetProp, + Input, + message, + Spin, + TreeSelect, + TreeSelectProps, + Typography, +} from "antd"; + +import {submitCompressionJob} from "../../../api/compress"; +import {listFiles} from "../../../api/os"; +import {DashboardCard} from "../../../components/DashboardCard"; +import {settings} from "../../../settings"; + + +/** + * Maps file system item to Antd TreeSelect flat tree node format. + * + * @param props + * @param props.isExpandable + * @param props.name + * @param props.parentPath + * @return the mapped Antd TreeSelect flat tree node. + */ +const mapFileToTreeNode = ({ + isExpandable, + name, + parentPath, +}: { + isExpandable: boolean; + name: string; + parentPath: string; +}) => { + if (0 === parentPath.length) { + parentPath = "/"; + } + const pathPrefix = parentPath.endsWith("/") ? + parentPath : + `${parentPath}/`; + const fullPath = pathPrefix + name; + + return { + id: fullPath, + isLeaf: !isExpandable, + pId: parentPath, + title: name, + value: fullPath, + }; +}; + +type TreeNode = Omit[number], "label">; + + +type FormValues = { + paths: string[]; + dataset?: string; + timestampKey?: string; +}; + +/** + * Renders an empty state display when a path is not found. + * + * @return + */ +const PathNotFoundEmpty = () => ( + +); + +/** + * Renders an empty state with a loading spinner. + * + * @return + */ +const PathLoadingEmpty = () => ( + }/> +); + +/** + * Renders a compression job submission form. + * + * @return + */ +const Compress = () => { + const [form] = Form.useForm(); + const [isSubmitting, setIsSubmitting] = useState(false); + const [submitResult, setSubmitResult] = useState<{success: boolean; message: string} | null>(null); + const [treeData, setTreeData] = useState([{id: "/", value: "/", title: "/", isLeaf: false}]); + const [expandedKeys, setExpandedKeys] = useState([]); + const [isLoading, setIsLoading] = useState(false); + + const fetchAndAppendTreeNodes = useCallback(async (path: string): Promise => { + try { + const {data} = await listFiles(path); + const newNodes = data.map(mapFileToTreeNode); + + setTreeData((prev) => { + // Create a map of existing node IDs for quick lookup + const existingNodeIds = new Set(prev.map((node) => node.id)); + + // Filter out nodes that already exist + const uniqueNewNodes = newNodes.filter((node) => !existingNodeIds.has(node.id)); + + return [ + ...prev, + ...uniqueNewNodes, + ]; + }); + + // automatically expand the parent node + setExpandedKeys((prev) => Array.from(new Set([...prev, + path]))); + + return true; + } catch (e) { + message.error(e instanceof Error ? + e.message : + "Unknown error while loading paths"); + + return false; + } + }, []); + + /* + * Load missing parent nodes for a given path. + */ + const loadMissingParents = useCallback(async (path: string): Promise => { + const pathSegments = path.split("/").filter((segment) => 0 < segment.length); + let currentPath = "/"; + + // Load root if not present + if (!treeData.some((node) => "/" === node.id)) { + const success = await fetchAndAppendTreeNodes("/"); + if (!success) { + return false; + } + } + + // Load each parent level + for (let i = 0; i < pathSegments.length; i++) { + const segment = pathSegments[i]; + const parentPath = currentPath; + currentPath = "/" === currentPath ? + `/${segment}` : + `${currentPath}/${segment}`; + + // Check if node already exists + if (!treeData.some((node) => node.id === currentPath)) { + const success = await fetchAndAppendTreeNodes(parentPath); + if (!success) { + return false; + } + } + } + + return true; + }, [treeData, + fetchAndAppendTreeNodes]); + + const handleLoadData = useCallback>(async (node) => { + const path = node.value; + if ("string" !== typeof path) { + return; + } + setIsLoading(true); + try { + await fetchAndAppendTreeNodes(path); + } finally { + setIsLoading(false); + } + }, [fetchAndAppendTreeNodes]); + + const handleSearch = useCallback>(async (value) => { + if (!value.trim()) { + return; + } + + setIsLoading(true); + try { + // Extract the base directory from the search string + let basePath: string; + if (value.endsWith("/")) { + // If it ends with "/", treat it as a directory + basePath = "/" === value ? + "/" : + value.slice(0, -1); + } else { + // If it's a file path, extract the directory part + const lastSlashIndex = value.lastIndexOf("/"); + if (-1 === lastSlashIndex) { + // No slash found, assume root directory + basePath = "/"; + } else if (0 === lastSlashIndex) { + // Path starts with "/" but has no other slashes, use root + basePath = "/"; + } else { + // Extract directory path + basePath = value.substring(0, lastSlashIndex); + } + } + + // Check if the base directory is already expanded to avoid unnecessary API calls + if (expandedKeys.includes(basePath)) { + return; + } + + const parentsLoaded = await loadMissingParents(basePath); + if (parentsLoaded) { + await fetchAndAppendTreeNodes(basePath); + } + } finally { + setIsLoading(false); + } + }, [fetchAndAppendTreeNodes, + loadMissingParents, + expandedKeys]); + + const handleTreeExpand = useCallback>((keys) => { + setExpandedKeys(keys); + }, []); + + const handleSubmit = async (values: FormValues) => { + setIsSubmitting(true); + setSubmitResult(null); + try { + const jobId = await submitCompressionJob({ + paths: values.paths, + dataset: values.dataset, + timestampKey: values.timestampKey, + }); + + setSubmitResult({success: true, message: `Compression job submitted successfully with ID: ${jobId}`}); + form.resetFields(); + } catch (error) { + setSubmitResult({ + success: false, + message: `Failed to submit compression job: ${error instanceof Error ? + error.message : + "Unknown error"}`, + }); + } finally { + setIsSubmitting(false); + } + }; + + const isClpS = "clp-s" === settings.ClpStorageEngine; + + return ( + +
+ + : + } + switcherIcon={(props) => (props.expanded ? + : + )} + onSearch={handleSearch} + onTreeExpand={handleTreeExpand}/> + + {isClpS && ( + <> + + + + + + + + )} + + + + + + {submitResult && ( + + {submitResult.message} + + )} +
+
+ ); +}; + +export default Compress; diff --git a/components/webui/client/src/pages/IngestPage/index.tsx b/components/webui/client/src/pages/IngestPage/index.tsx index d82a8fe9e7..d55e6faca0 100644 --- a/components/webui/client/src/pages/IngestPage/index.tsx +++ b/components/webui/client/src/pages/IngestPage/index.tsx @@ -1,3 +1,4 @@ +import Compress from "./Compress"; import Details from "./Details"; import styles from "./index.module.css"; import Jobs from "./Jobs"; @@ -14,6 +15,9 @@ const IngestPage = () => {
+
+ +
diff --git a/components/webui/client/src/pages/SearchPage/SearchControls/index.tsx b/components/webui/client/src/pages/SearchPage/SearchControls/index.tsx index 79413c3f77..ffd2d646ed 100644 --- a/components/webui/client/src/pages/SearchPage/SearchControls/index.tsx +++ b/components/webui/client/src/pages/SearchPage/SearchControls/index.tsx @@ -32,7 +32,7 @@ const handleSubmit = (ev: React.FormEvent) => { const SearchControls = () => { /* eslint-disable-next-line no-warning-comments */ // TODO: Remove flag and related logic when the new guide UI is fully implemented. - const isGuidedEnabled = "true" === import.meta.env[`VITE_GUIDED_DEV`]; + const isGuidedEnabled = "true" === import.meta.env["VITE_GUIDED_DEV"]; return (
diff --git a/components/webui/server/settings.json b/components/webui/server/settings.json index 9b0386021f..197f3391ec 100644 --- a/components/webui/server/settings.json +++ b/components/webui/server/settings.json @@ -3,6 +3,7 @@ "SqlDbPort": 3306, "SqlDbName": "clp-db", "SqlDbQueryJobsTableName": "query_jobs", + "SqlDbCompressionJobsTableName": "compression_jobs", "MongoDbHost": "localhost", "MongoDbPort": 27017, @@ -19,6 +20,9 @@ "StreamFilesS3Profile": null, "ClpQueryEngine": "clp", + "ClpStorageEngine": "clp", "PrestoHost": "localhost", - "PrestoPort": 8889 + "PrestoPort": 8889, + + "LsRoot": "/" } diff --git a/components/webui/server/src/plugins/app/CompressionJobDbManager/index.ts b/components/webui/server/src/plugins/app/CompressionJobDbManager/index.ts new file mode 100644 index 0000000000..44844d9af0 --- /dev/null +++ b/components/webui/server/src/plugins/app/CompressionJobDbManager/index.ts @@ -0,0 +1,89 @@ +import {brotliCompressSync} from "node:zlib"; + +import type {MySQLPromisePool} from "@fastify/mysql"; +import {encode} from "@msgpack/msgpack"; +import {FastifyInstance} from "fastify"; +import fp from "fastify-plugin"; +import {ResultSetHeader} from "mysql2"; + +import settings from "../../../../settings.json" with {type: "json"}; +import {COMPRESSION_JOBS_TABLE_COLUMN_NAMES} from "../../../typings/compression.js"; + + +export interface CompressionJobConfig { + input: { + paths_to_compress: string[]; + path_prefix_to_remove: string; + dataset?: string; + timestamp_key?: string; + }; + output: { + target_archive_size: number; + target_dictionaries_size: number; + target_segment_size: number; + target_encoded_file_size: number; + compression_level: number; + }; +} + +/** + * Class for managing compression jobs in the CLP package compression scheduler database. + */ +class CompressionJobDbManager { + readonly #sqlPool: MySQLPromisePool; + + readonly #tableName: string; + + private constructor (sqlPool: MySQLPromisePool, tableName: string) { + this.#sqlPool = sqlPool; + this.#tableName = tableName; + } + + /** + * Creates a new CompressionJobDbManager. + * + * @param fastify + * @return + */ + static create (fastify: FastifyInstance): CompressionJobDbManager { + return new CompressionJobDbManager(fastify.mysql, settings.SqlDbCompressionJobsTableName); + } + + /** + * Submits a compression job to the database. + * + * @param jobConfig + * @return The job's ID. + * @throws {Error} on error. + */ + async submitJob (jobConfig: CompressionJobConfig): Promise { + const [result] = await this.#sqlPool.query( + ` + INSERT INTO ${this.#tableName} ( + ${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG} + ) + VALUES (?) + `, + [ + Buffer.from(brotliCompressSync(encode(jobConfig))), + ] + ); + + return result.insertId; + } +} + +declare module "fastify" { + interface FastifyInstance { + CompressionJobDbManager: CompressionJobDbManager; + } +} + +export default fp( + (fastify) => { + fastify.decorate("CompressionJobDbManager", CompressionJobDbManager.create(fastify)); + }, + { + name: "CompressionJobDbManager", + } +); diff --git a/components/webui/server/src/plugins/app/CompressionJobDbManager/typings.ts b/components/webui/server/src/plugins/app/CompressionJobDbManager/typings.ts new file mode 100644 index 0000000000..77c2b433b8 --- /dev/null +++ b/components/webui/server/src/plugins/app/CompressionJobDbManager/typings.ts @@ -0,0 +1,6 @@ +/** + * Interval (in milliseconds) for polling a job's completion status. + */ +const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 1000; + +export {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS}; diff --git a/components/webui/server/src/plugins/app/QueryJobDbManager/index.ts b/components/webui/server/src/plugins/app/QueryJobDbManager/index.ts index 6e84ad5e45..f3fe90adb6 100644 --- a/components/webui/server/src/plugins/app/QueryJobDbManager/index.ts +++ b/components/webui/server/src/plugins/app/QueryJobDbManager/index.ts @@ -51,7 +51,7 @@ class QueryJobDbManager { async submitJob (jobConfig: object, jobType: QUERY_JOB_TYPE): Promise { const [result] = await this.#sqlPool.query( ` - INSERT INTO ${settings.SqlDbQueryJobsTableName} ( + INSERT INTO ${this.#tableName} ( ${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE} ) diff --git a/components/webui/server/src/routes/api/compress/index.ts b/components/webui/server/src/routes/api/compress/index.ts new file mode 100644 index 0000000000..efb265af55 --- /dev/null +++ b/components/webui/server/src/routes/api/compress/index.ts @@ -0,0 +1,89 @@ +import {FastifyPluginAsyncTypebox} from "@fastify/type-provider-typebox"; +import {StatusCodes} from "http-status-codes"; + +import settings from "../../../../settings.json" with {type: "json"}; +import {CompressionJobConfig} from "../../../plugins/app/CompressionJobDbManager/index.js"; +import { + CompressionJobCreationSchema, + CompressionJobSchema, +} from "../../../schemas/compression.js"; +import {ErrorSchema} from "../../../schemas/error.js"; + + +const DEFAULT_PATH_PREFIX = "/mnt/logs"; + +/** + * Default compression job configuration. + */ +const DEFAULT_COMPRESSION_JOB_CONFIG: CompressionJobConfig = Object.freeze({ + input: { + paths_to_compress: [], + path_prefix_to_remove: DEFAULT_PATH_PREFIX, + }, + output: { + compression_level: 3, + target_archive_size: 268435456, + target_dictionaries_size: 33554432, + target_encoded_file_size: 268435456, + target_segment_size: 268435456, + }, +}); + +/** + * Compression API routes. + * + * @param fastify + */ +const plugin: FastifyPluginAsyncTypebox = async (fastify) => { + const {CompressionJobDbManager} = fastify; + + /** + * Submits a compression job and initiates the compression process. + */ + fastify.post( + "/", + { + schema: { + body: CompressionJobCreationSchema, + response: { + [StatusCodes.CREATED]: CompressionJobSchema, + [StatusCodes.INTERNAL_SERVER_ERROR]: ErrorSchema, + }, + tags: ["Compression"], + }, + }, + async (request, reply) => { + const { + paths, + dataset, + timestampKey, + } = request.body; + + const jobConfig: CompressionJobConfig = structuredClone(DEFAULT_COMPRESSION_JOB_CONFIG); + jobConfig.input.paths_to_compress = paths.map((path) => DEFAULT_PATH_PREFIX + path); + + if ("clp-s" === settings.ClpStorageEngine) { + if ("undefined" !== typeof dataset) { + jobConfig.input.dataset = dataset; + } + if ("undefined" !== typeof timestampKey) { + jobConfig.input.timestamp_key = timestampKey; + } + } + + try { + const jobId = await CompressionJobDbManager.submitJob(jobConfig); + reply.code(StatusCodes.CREATED); + + return {jobId}; + } catch (err: unknown) { + const errMsg = "Unable to submit compression job to the SQL database"; + request.log.error(err, errMsg); + + return reply.internalServerError(errMsg); + } + } + ); +}; + +export default plugin; diff --git a/components/webui/server/src/routes/api/os/index.ts b/components/webui/server/src/routes/api/os/index.ts new file mode 100644 index 0000000000..9ddf55af37 --- /dev/null +++ b/components/webui/server/src/routes/api/os/index.ts @@ -0,0 +1,109 @@ +import {FastifyPluginAsyncTypebox} from "@fastify/type-provider-typebox"; +import {Type} from "@sinclair/typebox"; +import fs from "fs/promises"; +import {StatusCodes} from "http-status-codes"; +import path from "path"; + +import settings from "../../../../settings.json" with {type: "json"}; +import {StringSchema} from "../../../schemas/common.js"; + + +/** + * Resolves a requested path against the LsRoot setting. + * + * @param requestedPath The path requested by the client + * @return The resolved absolute path + */ +const resolveLsPath = (requestedPath: string): string => { + let cleanPath = requestedPath; + + // Remove leading slashes for non-root LsRoot settings + if ("/" !== settings.LsRoot) { + cleanPath = cleanPath.replace(/^\/+/, ""); + } + + return path.resolve(settings.LsRoot, cleanPath); +}; + +/** + * Normalizes a path for client display by removing the LsRoot prefix. + * + * @param fullPath The full path to normalize + * @return The normalized path relative to LsRoot + */ +const normalizeLsPath = (fullPath: string): string => { + return fullPath.replace(new RegExp(`^${settings.LsRoot}/*`), "/"); +}; + + +const FileListRequestSchema = Type.Object({ + path: Type.String({ + default: "/", + }), +}); + + +const FileListResponseSchema = Type.Array( + Type.Object({ + isExpandable: Type.Boolean(), + name: StringSchema, + parentPath: StringSchema, + }) +); + + +/** + * File listing API routes. + * + * @param fastify + */ +const plugin: FastifyPluginAsyncTypebox = async (fastify) => { + /** + * Lists files and directories at the specified path. + */ + fastify.get( + "/ls", + { + schema: { + querystring: FileListRequestSchema, + response: { + [StatusCodes.OK]: FileListResponseSchema, + }, + }, + }, + async (request, reply) => { + const {path: requestedPath} = request.query; + + try { + const resolvedPath = resolveLsPath(requestedPath); + + try { + await fs.access(resolvedPath); + } catch { + return await reply.notFound(`Path not found: ${resolvedPath}`); + } + + const direntList = await fs.readdir(resolvedPath, {withFileTypes: true}); + return direntList.map((dirent) => { + const isExpandable = dirent.isDirectory() || dirent.isSymbolicLink(); + return { + isExpandable: isExpandable, + name: dirent.name, + parentPath: normalizeLsPath(dirent.parentPath), + }; + }); + } catch (e: unknown) { + if (reply.sent) { + return Promise.resolve(); + } + + const errMsg = "Unable to list files"; + request.log.error(e, errMsg); + + return reply.internalServerError(errMsg); + } + } + ); +}; + +export default plugin; diff --git a/components/webui/server/src/schemas/compression.ts b/components/webui/server/src/schemas/compression.ts new file mode 100644 index 0000000000..761683046e --- /dev/null +++ b/components/webui/server/src/schemas/compression.ts @@ -0,0 +1,23 @@ +import {Type} from "@fastify/type-provider-typebox"; + + +/** + * Schema for request to create a new compression job. + */ +const CompressionJobCreationSchema = Type.Object({ + paths: Type.Array(Type.String()), + dataset: Type.Optional(Type.String()), + timestampKey: Type.Optional(Type.String()), +}); + +/** + * Schema for compression job response. + */ +const CompressionJobSchema = Type.Object({ + jobId: Type.Number(), +}); + +export { + CompressionJobCreationSchema, + CompressionJobSchema, +}; diff --git a/components/webui/server/src/typings/compression.ts b/components/webui/server/src/typings/compression.ts new file mode 100644 index 0000000000..ea1cc52219 --- /dev/null +++ b/components/webui/server/src/typings/compression.ts @@ -0,0 +1,47 @@ +import {RowDataPacket} from "mysql2/promise"; + + +/** + * Matching the `CompressionJobStatus` class in `job_orchestration.scheduler.constants`. + * + * @enum {number} + */ +enum COMPRESSION_JOB_STATUS { + PENDING = 0, + RUNNING, + SUCCEEDED, + FAILED, + KILLED, +} + +/** + * List of states that indicate the job is either pending or in progress. + */ +const COMPRESSION_JOB_STATUS_WAITING_STATES = new Set([ + COMPRESSION_JOB_STATUS.PENDING, + COMPRESSION_JOB_STATUS.RUNNING, +]); + +/** + * The `compression_jobs` table's column names. + * + * @enum {string} + */ +enum COMPRESSION_JOBS_TABLE_COLUMN_NAMES { + ID = "id", + STATUS = "status", + JOB_CONFIG = "clp_config", +} + +interface CompressionJob extends RowDataPacket { + [COMPRESSION_JOBS_TABLE_COLUMN_NAMES.ID]: number; + [COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS]: COMPRESSION_JOB_STATUS; + [COMPRESSION_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG]: string; +} + +export type {CompressionJob}; +export { + COMPRESSION_JOB_STATUS, + COMPRESSION_JOB_STATUS_WAITING_STATES, + COMPRESSION_JOBS_TABLE_COLUMN_NAMES, +}; diff --git a/components/webui/server/src/typings/query.ts b/components/webui/server/src/typings/query.ts index 1cefbe614f..e12b89b01b 100644 --- a/components/webui/server/src/typings/query.ts +++ b/components/webui/server/src/typings/query.ts @@ -2,7 +2,7 @@ import {RowDataPacket} from "mysql2/promise"; /** - * Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`. + * Matching the `QueryJobType` class in `job_orchestration.scheduler.constants`. */ enum QUERY_JOB_TYPE { SEARCH_OR_AGGREGATION = 0, @@ -20,7 +20,7 @@ const EXTRACT_JOB_TYPES = new Set([ /** * Matching the `QueryJobStatus` class in - * `job_orchestration.query_scheduler.constants`. + * `job_orchestration.scheduler.constants`. * * @enum {number} */