Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions src/VirtualClient/VirtualClient.Actions/Cassandra/CassandraExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace VirtualClient.Actions
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Abstractions;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using VirtualClient.Common;
using VirtualClient.Common.Extensions;
using VirtualClient.Common.Platform;
using VirtualClient.Common.Telemetry;
using VirtualClient.Contracts;
using VirtualClient.Contracts.Metadata;

/// <summary>
/// Executor.
/// </summary>
public class CassandraExecutor : VirtualClientComponent
{
private IFileSystem fileSystem;
private IPackageManager packageManager;
private IStateManager stateManager;
private ISystemManagement systemManager;
// private string cassandraDirectory = @".";

/// <summary>
/// Executor.
/// </summary>
public CassandraExecutor(IServiceCollection dependencies, IDictionary<string, IConvertible> parameters)
: base(dependencies, parameters)
{
this.systemManager = this.Dependencies.GetService<ISystemManagement>();
this.packageManager = this.systemManager.PackageManager;
this.stateManager = this.systemManager.StateManager;
this.fileSystem = this.systemManager.FileSystem;
}

/// <summary>
/// Cassandra space separated input files or directories
/// </summary>
public string InputFilesOrDirs
{
get
{
this.Parameters.TryGetValue(nameof(CassandraExecutor.InputFilesOrDirs), out IConvertible inputFilesOrDirs);
return inputFilesOrDirs?.ToString();
}
}

/// <summary>
/// Executes the workload.
/// </summary>
protected override async Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken)
{
try
{
// Command and parameters specified in the workload configuration
// Execute the command
using (BackgroundOperations profiling = BackgroundOperations.BeginProfiling(this, cancellationToken))
{
// this.Logger.LogInformation($"inside using profiling");
string command = this.Parameters.GetValue<string>("command");
string argument = this.Parameters.GetValue<string>("parameters");
using (IProcessProxy process = await this.ExecuteCommandAsync(command, argument, ".", telemetryContext, cancellationToken)
.ConfigureAwait(false))
{
if (!cancellationToken.IsCancellationRequested)
{
await this.LogProcessDetailsAsync(process, telemetryContext, "Cassandra", logToFile: true);
process.ThrowIfWorkloadFailed();
this.CaptureMetrics(process, telemetryContext, argument);
}

if (process.ExitCode != 0)
{
throw new WorkloadException($"Command failed with exit code {process.ExitCode}.");
}
}
}
}
catch (Exception ex)
{
// Log the error and rethrow
this.Logger.LogMessage($"Failed to parse cassandra output: {ex.Message}", LogLevel.Warning, telemetryContext);
throw new WorkloadException($"Failed to parse cassandra output: {ex.Message}", ex);
}
}

private async Task ExecuteCommandAsync(string pathToExe, string commandLineArguments, string workingDirectory, CancellationToken cancellationToken)
{
if (!cancellationToken.IsCancellationRequested)
{
this.Logger.LogTraceMessage($"Executing process '{pathToExe}' '{commandLineArguments}' at directory '{workingDirectory}'.");

EventContext telemetryContext = EventContext.Persisted()
.AddContext("command", pathToExe)
.AddContext("commandArguments", commandLineArguments);

await this.Logger.LogMessageAsync($"{nameof(CassandraExecutor)}.ExecuteProcess", telemetryContext, async () =>
{
DateTime start = DateTime.Now;
using (IProcessProxy process = this.systemManager.ProcessManager.CreateElevatedProcess(this.Platform, pathToExe, commandLineArguments, workingDirectory))
{
this.CleanupTasks.Add(() => process.SafeKill());
await process.StartAndWaitAsync(cancellationToken)
.ConfigureAwait(false);

if (!cancellationToken.IsCancellationRequested)
{
await this.LogProcessDetailsAsync(process, telemetryContext)
.ConfigureAwait(false);

process.ThrowIfErrored<WorkloadException>(errorReason: ErrorReason.WorkloadFailed);
}
}
}).ConfigureAwait(false);
}
}

private void CaptureMetrics(IProcessProxy process, EventContext telemetryContext, string commandArguments)
{
process.ThrowIfNull(nameof(process));

this.MetadataContract.AddForScenario(
"cassandra",
commandArguments,
toolVersion: null);

this.MetadataContract.Apply(telemetryContext);

CassandraMetricsParser parser = new CassandraMetricsParser(process.StandardOutput.ToString());
IList<Metric> metrics = parser.Parse();

this.Logger.LogMetrics(
"cassandra",
this.Scenario,
process.StartTime,
process.ExitTime,
metrics,
null,
commandArguments,
this.Tags,
telemetryContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
namespace VirtualClient.Actions
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Text.RegularExpressions;
using VirtualClient.Common.Contracts;
using VirtualClient.Contracts;
using DataTableExtensions = VirtualClient.Contracts.DataTableExtensions;

/// <summary>
/// Parser for Cassandra output document.
/// </summary>
public class CassandraMetricsParser : MetricsParser
{
private static readonly Regex OpRateRegex = new Regex(@"Op rate\s+:\s+([\d,]+) op/s", RegexOptions.Multiline);
private static readonly Regex PartitionRateRegex = new Regex(@"Partition rate\s+:\s+([\d,]+) pk/s", RegexOptions.Multiline);
private static readonly Regex RowRateRegex = new Regex(@"Row rate\s+:\s+([\d,]+) row/s", RegexOptions.Multiline);
private static readonly Regex LatencyMeanRegex = new Regex(@"Latency mean\s+:\s+([\d.]+) ms", RegexOptions.Multiline);
private static readonly Regex LatencyMaxRegex = new Regex(@"Latency max\s+:\s+([\d.]+) ms", RegexOptions.Multiline);
private static readonly Regex TotalErrorsRegex = new Regex(@"Total errors\s+:\s+([\d,]+)", RegexOptions.Multiline);
private static readonly Regex TotalOperationTimeRegex = new Regex(@"Total operation time\s+:\s+(\d{2}:\d{2}:\d{2})", RegexOptions.Multiline);

/// <summary>
/// constructor for <see cref="CassandraMetricsParser"/>.
/// </summary>
/// <param name="rawText">Raw text to parse.</param>
public CassandraMetricsParser(string rawText)
: base(rawText)
{
}

/// <summary>
/// List of parsed metrics from the YCSB output.
/// </summary>
public List<Metric> Metrics { get; set; } = new List<Metric>();

/// <inheritdoc/>
public override IList<Metric> Parse()
{
this.Preprocess();
this.ThrowIfInvalidOutputFormat();
this.ExtractMetrics();

return this.Metrics;
}

/// <inheritdoc/>
protected override void Preprocess()
{
// Normalize the text to ensure consistent formatting.
this.PreprocessedText = Regex.Replace(this.RawText, "\r\n", "\n");
this.PreprocessedText = Regex.Replace(this.PreprocessedText, "\n\n", "\n"); // Consolidate multiple newlines
}

/// <inheritdoc/>
private void ThrowIfInvalidOutputFormat()
{
if (string.IsNullOrWhiteSpace(this.PreprocessedText))
{
throw new SchemaException("The Cassandra Stress output has incorrect format for parsing: empty or null text.");
}

if (!OpRateRegex.IsMatch(this.PreprocessedText))
{
throw new SchemaException("The Cassandra Stress output has incorrect format for parsing: missing key metrics.");
}
}

private void ExtractMetrics()
{
this.ExtractMetric(OpRateRegex, "Op Rate", "ops/s", true);
this.ExtractMetric(PartitionRateRegex, "Partition Rate", "pk/s", false);
this.ExtractMetric(RowRateRegex, "Row Rate", "row/s", true);
this.ExtractMetric(LatencyMeanRegex, "Latency Mean", "ms", false);
this.ExtractMetric(LatencyMaxRegex, "Latency Max", "ms", false);
this.ExtractMetric(TotalErrorsRegex, "Total Errors", "count", false);
var match = TotalOperationTimeRegex.Match(this.PreprocessedText);
if (match.Success)
{
if (TimeSpan.TryParse(match.Groups[1].Value, out TimeSpan operationTime))
{
double totalSeconds = operationTime.TotalSeconds;
this.Metrics.Add(new Metric("Total Operation Time", totalSeconds, "seconds", MetricRelativity.LowerIsBetter));
}
else
{
throw new FormatException($"Invalid operation time format: {match.Groups[1].Value}");
}
}
}

private void ExtractMetric(Regex regex, string metricName, string unit, bool higherIsBetter)
{
var match = regex.Match(this.PreprocessedText);
if (match.Success)
{
this.Metrics.Add(new Metric(metricName, Convert.ToDouble(match.Groups[1].Value), unit, higherIsBetter ? MetricRelativity.HigherIsBetter : MetricRelativity.LowerIsBetter));
}
}
}
}
28 changes: 28 additions & 0 deletions src/VirtualClient/VirtualClient.Main/profiles/PERF-CASSANDRA.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"Description": "Cassandra Workload",
"Metadata": {
"RecommendedMinimumExecutionTime": "00:05:00",
"SupportedPlatforms": "linux-x64,linux-arm64,win-x64,win-arm64",
"SupportedOperatingSystems": "CBL-Mariner,CentOS,Debian,RedHat,Suse,Ubuntu,Windows"
},
"Actions": [
{
"Type": "CassandraExecutor",
"Parameters": {
"Scenario": "Cassandra stress test for READ",
"command": "cassandra-stress",
"parameters": "read n=20000 -rate threads=50"
}
}
],
"Dependencies": [
{
"Type": "LinuxPackageInstallation",
"Parameters": {
"Scenario": "InstallCassandraPackage",
"Packages-Apt": "cassandra"
}
}
]
}