diff --git a/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraExecutor.cs b/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraExecutor.cs new file mode 100644 index 0000000000..d3bc0f5734 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraExecutor.cs @@ -0,0 +1,153 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO.Abstractions; + using System.Runtime.InteropServices; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; + using VirtualClient.Common; + using VirtualClient.Common.Extensions; + using VirtualClient.Common.Platform; + using VirtualClient.Common.Telemetry; + using VirtualClient.Contracts; + using VirtualClient.Contracts.Metadata; + + /// + /// Executor. + /// + public class CassandraExecutor : VirtualClientComponent + { + private IFileSystem fileSystem; + private IPackageManager packageManager; + private IStateManager stateManager; + private ISystemManagement systemManager; + // private string cassandraDirectory = @"."; + + /// + /// Executor. + /// + public CassandraExecutor(IServiceCollection dependencies, IDictionary parameters) + : base(dependencies, parameters) + { + this.systemManager = this.Dependencies.GetService(); + this.packageManager = this.systemManager.PackageManager; + this.stateManager = this.systemManager.StateManager; + this.fileSystem = this.systemManager.FileSystem; + } + + /// + /// Cassandra space separated input files or directories + /// + public string InputFilesOrDirs + { + get + { + this.Parameters.TryGetValue(nameof(CassandraExecutor.InputFilesOrDirs), out IConvertible inputFilesOrDirs); + return inputFilesOrDirs?.ToString(); + } + } + + /// + /// Executes the workload. + /// + protected override async Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + try + { + // Command and parameters specified in the workload configuration + // Execute the command + using (BackgroundOperations profiling = BackgroundOperations.BeginProfiling(this, cancellationToken)) + { + // this.Logger.LogInformation($"inside using profiling"); + string command = this.Parameters.GetValue("command"); + string argument = this.Parameters.GetValue("parameters"); + using (IProcessProxy process = await this.ExecuteCommandAsync(command, argument, ".", telemetryContext, cancellationToken) + .ConfigureAwait(false)) + { + if (!cancellationToken.IsCancellationRequested) + { + await this.LogProcessDetailsAsync(process, telemetryContext, "Cassandra", logToFile: true); + process.ThrowIfWorkloadFailed(); + this.CaptureMetrics(process, telemetryContext, argument); + } + + if (process.ExitCode != 0) + { + throw new WorkloadException($"Command failed with exit code {process.ExitCode}."); + } + } + } + } + catch (Exception ex) + { + // Log the error and rethrow + this.Logger.LogMessage($"Failed to parse cassandra output: {ex.Message}", LogLevel.Warning, telemetryContext); + throw new WorkloadException($"Failed to parse cassandra output: {ex.Message}", ex); + } + } + + private async Task ExecuteCommandAsync(string pathToExe, string commandLineArguments, string workingDirectory, CancellationToken cancellationToken) + { + if (!cancellationToken.IsCancellationRequested) + { + this.Logger.LogTraceMessage($"Executing process '{pathToExe}' '{commandLineArguments}' at directory '{workingDirectory}'."); + + EventContext telemetryContext = EventContext.Persisted() + .AddContext("command", pathToExe) + .AddContext("commandArguments", commandLineArguments); + + await this.Logger.LogMessageAsync($"{nameof(CassandraExecutor)}.ExecuteProcess", telemetryContext, async () => + { + DateTime start = DateTime.Now; + using (IProcessProxy process = this.systemManager.ProcessManager.CreateElevatedProcess(this.Platform, pathToExe, commandLineArguments, workingDirectory)) + { + this.CleanupTasks.Add(() => process.SafeKill()); + await process.StartAndWaitAsync(cancellationToken) + .ConfigureAwait(false); + + if (!cancellationToken.IsCancellationRequested) + { + await this.LogProcessDetailsAsync(process, telemetryContext) + .ConfigureAwait(false); + + process.ThrowIfErrored(errorReason: ErrorReason.WorkloadFailed); + } + } + }).ConfigureAwait(false); + } + } + + private void CaptureMetrics(IProcessProxy process, EventContext telemetryContext, string commandArguments) + { + process.ThrowIfNull(nameof(process)); + + this.MetadataContract.AddForScenario( + "cassandra", + commandArguments, + toolVersion: null); + + this.MetadataContract.Apply(telemetryContext); + + CassandraMetricsParser parser = new CassandraMetricsParser(process.StandardOutput.ToString()); + IList metrics = parser.Parse(); + + this.Logger.LogMetrics( + "cassandra", + this.Scenario, + process.StartTime, + process.ExitTime, + metrics, + null, + commandArguments, + this.Tags, + telemetryContext); + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraMetricsParser.cs b/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraMetricsParser.cs new file mode 100644 index 0000000000..88e2baec87 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraMetricsParser.cs @@ -0,0 +1,102 @@ +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Data; + using System.Text.RegularExpressions; + using VirtualClient.Common.Contracts; + using VirtualClient.Contracts; + using DataTableExtensions = VirtualClient.Contracts.DataTableExtensions; + + /// + /// Parser for Cassandra output document. + /// + public class CassandraMetricsParser : MetricsParser + { + private static readonly Regex OpRateRegex = new Regex(@"Op rate\s+:\s+([\d,]+) op/s", RegexOptions.Multiline); + private static readonly Regex PartitionRateRegex = new Regex(@"Partition rate\s+:\s+([\d,]+) pk/s", RegexOptions.Multiline); + private static readonly Regex RowRateRegex = new Regex(@"Row rate\s+:\s+([\d,]+) row/s", RegexOptions.Multiline); + private static readonly Regex LatencyMeanRegex = new Regex(@"Latency mean\s+:\s+([\d.]+) ms", RegexOptions.Multiline); + private static readonly Regex LatencyMaxRegex = new Regex(@"Latency max\s+:\s+([\d.]+) ms", RegexOptions.Multiline); + private static readonly Regex TotalErrorsRegex = new Regex(@"Total errors\s+:\s+([\d,]+)", RegexOptions.Multiline); + private static readonly Regex TotalOperationTimeRegex = new Regex(@"Total operation time\s+:\s+(\d{2}:\d{2}:\d{2})", RegexOptions.Multiline); + + /// + /// constructor for . + /// + /// Raw text to parse. + public CassandraMetricsParser(string rawText) + : base(rawText) + { + } + + /// + /// List of parsed metrics from the YCSB output. + /// + public List Metrics { get; set; } = new List(); + + /// + public override IList Parse() + { + this.Preprocess(); + this.ThrowIfInvalidOutputFormat(); + this.ExtractMetrics(); + + return this.Metrics; + } + + /// + protected override void Preprocess() + { + // Normalize the text to ensure consistent formatting. + this.PreprocessedText = Regex.Replace(this.RawText, "\r\n", "\n"); + this.PreprocessedText = Regex.Replace(this.PreprocessedText, "\n\n", "\n"); // Consolidate multiple newlines + } + + /// + private void ThrowIfInvalidOutputFormat() + { + if (string.IsNullOrWhiteSpace(this.PreprocessedText)) + { + throw new SchemaException("The Cassandra Stress output has incorrect format for parsing: empty or null text."); + } + + if (!OpRateRegex.IsMatch(this.PreprocessedText)) + { + throw new SchemaException("The Cassandra Stress output has incorrect format for parsing: missing key metrics."); + } + } + + private void ExtractMetrics() + { + this.ExtractMetric(OpRateRegex, "Op Rate", "ops/s", true); + this.ExtractMetric(PartitionRateRegex, "Partition Rate", "pk/s", false); + this.ExtractMetric(RowRateRegex, "Row Rate", "row/s", true); + this.ExtractMetric(LatencyMeanRegex, "Latency Mean", "ms", false); + this.ExtractMetric(LatencyMaxRegex, "Latency Max", "ms", false); + this.ExtractMetric(TotalErrorsRegex, "Total Errors", "count", false); + var match = TotalOperationTimeRegex.Match(this.PreprocessedText); + if (match.Success) + { + if (TimeSpan.TryParse(match.Groups[1].Value, out TimeSpan operationTime)) + { + double totalSeconds = operationTime.TotalSeconds; + this.Metrics.Add(new Metric("Total Operation Time", totalSeconds, "seconds", MetricRelativity.LowerIsBetter)); + } + else + { + throw new FormatException($"Invalid operation time format: {match.Groups[1].Value}"); + } + } + } + + private void ExtractMetric(Regex regex, string metricName, string unit, bool higherIsBetter) + { + var match = regex.Match(this.PreprocessedText); + if (match.Success) + { + this.Metrics.Add(new Metric(metricName, Convert.ToDouble(match.Groups[1].Value), unit, higherIsBetter ? MetricRelativity.HigherIsBetter : MetricRelativity.LowerIsBetter)); + } + } + } +} diff --git a/src/VirtualClient/VirtualClient.Main/profiles/PERF-CASSANDRA.json b/src/VirtualClient/VirtualClient.Main/profiles/PERF-CASSANDRA.json new file mode 100644 index 0000000000..73e5c53085 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Main/profiles/PERF-CASSANDRA.json @@ -0,0 +1,28 @@ +{ + "Description": "Cassandra Workload", + "Metadata": { + "RecommendedMinimumExecutionTime": "00:05:00", + "SupportedPlatforms": "linux-x64,linux-arm64,win-x64,win-arm64", + "SupportedOperatingSystems": "CBL-Mariner,CentOS,Debian,RedHat,Suse,Ubuntu,Windows" + }, + "Actions": [ + { + "Type": "CassandraExecutor", + "Parameters": { + "Scenario": "Cassandra stress test for READ", + "command": "cassandra-stress", + "parameters": "read n=20000 -rate threads=50" + } + } + ], + "Dependencies": [ + { + "Type": "LinuxPackageInstallation", + "Parameters": { + "Scenario": "InstallCassandraPackage", + "Packages-Apt": "cassandra" + } + } + ] +} +