Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions metrics_utility/anonymized_rollups/anonymized_rollups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import pandas as pd

from pandas import DataFrame

from metrics_utility.anonymized_rollups.base_anonymized_rollup import BaseAnonymizedRollup
from metrics_utility.anonymized_rollups.events_modules_anonymized_rollup import EventModulesAnonymizedRollup
from metrics_utility.anonymized_rollups.execution_environments_anonymized_rollup import ExecutionEnvironmentsAnonymizedRollup
Expand Down Expand Up @@ -110,15 +108,17 @@ def flatten_json_report(data: Dict[str, Any]) -> Dict[str, Any]:
# from events_modules
'modules_used_to_automate_total': events_modules.get('modules_used_to_automate_total'),
'avg_number_of_modules_used_in_a_playbooks': events_modules.get('avg_number_of_modules_used_in_a_playbooks'),
'total_hosts_automated': events_modules.get('total_hosts_automated'),
'hosts_automated_total': events_modules.get('hosts_automated_total'),
'event_total': events_modules.get('event_total'),
# from execution_environments
'total_EE': execution_environments.get('total_EE'),
'default_EE': execution_environments.get('default_EE'),
'custom_EE': execution_environments.get('custom_EE'),
'EE_total': execution_environments.get('EE_total'),
'EE_default_total': execution_environments.get('EE_default_total'),
'EE_custom_total': execution_environments.get('EE_custom_total'),
# from jobs
'jobs_total': jobs.get('jobs_total'),
# from job_host_summary
'total_unique_hosts': job_host_summary_root.get('total_unique_hosts'),
'unique_hosts_total': job_host_summary_root.get('unique_hosts_total'),
'jobhostsummary_total': job_host_summary_root.get('jobhostsummary_total'),
}

# 2) modules_used_per_playbook (convert map -> array)
Expand Down Expand Up @@ -209,14 +209,14 @@ def compute_anonymized_rollup_from_raw_data(input_data, salt, since, until, base
# inside tarball is file named {collector_name}.csv
# this goes to dataframe, then filter_function is applied to the dataframe
# all result dataframes are concatenated into one dataframe
def load_anonymized_rollup_data(rollup_object: BaseAnonymizedRollup, file_list: []) -> DataFrame:
def load_anonymized_rollup_data(rollup_object: BaseAnonymizedRollup, file_list: []):
# file_list - list of csv files that needs to be read

concat_dataframe = pd.DataFrame()
concat_data = pd.DataFrame()

for file in file_list:
df = pd.read_csv(file)
df = rollup_object.prepare(df)
concat_dataframe = rollup_object.merge(concat_dataframe, df)
prepared_data = rollup_object.prepare(df)
concat_data = rollup_object.merge(concat_data, prepared_data)

return concat_dataframe
return concat_data
4 changes: 4 additions & 0 deletions metrics_utility/anonymized_rollups/base_anonymized_rollup.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def save_rollup(self, rollup_data: dict, base_path: str, since: datetime, until:
# Sanitize and store JSON data in memory for tar
sanitized_value = sanitize_json(value)
tar_files[f'{filename}.json'] = json.dumps(sanitized_value, indent=2).encode('utf-8')
elif isinstance(value, (int, float, str, bool)) or value is None:
# Handle scalar values (int, float, str, bool, None) by wrapping in a dict
sanitized_value = sanitize_json({key: value})
tar_files[f'{filename}.json'] = json.dumps(sanitized_value, indent=2).encode('utf-8')
# the rest
else:
print(f'Key {key} is a unknown type')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,32 @@ def __init__(self):
with open(collections_path, 'r') as f:
self.collections = json.load(f)

def merge(self, data_all, data_new):
"""
Override merge to handle the new structure with event_total and task_summary.
Concatenates task_summary dataframes and sums event_total.
"""
# Handle initial empty DataFrame case (first iteration from load_anonymized_rollup_data)
if isinstance(data_all, pd.DataFrame) and data_all.empty:
return data_new

# Handle initial empty dict case
if isinstance(data_all, dict) and not data_all:
return data_new

# Concatenate task_summary dataframes and sum event_totals
return {
'event_total': data_all['event_total'] + data_new['event_total'],
'task_summary': pd.concat([data_all['task_summary'], data_new['task_summary']], ignore_index=True),
}

# Prepare is run for each batch of data
# then it is merged with other batches into one dataframes
# as default, merging is done by concatenating dataframes (defined in base class)
def prepare(self, dataframe):
# Count all events before pruning
event_total = len(dataframe)

# Failure/Success rate of modules
success_events_list = ['runner_on_ok', 'runner_on_async_ok', 'runner_item_on_ok']
failed_events_list = ['runner_on_failed', 'runner_on_async_failed', 'runner_item_on_failed']
Expand Down Expand Up @@ -188,9 +210,12 @@ def prepare(self, dataframe):
playbook=('playbook', 'first'),
)

return task_summary
return {
'event_total': event_total,
'task_summary': task_summary,
}

def base(self, dataframe):
def base(self, data):
"""
*Avg number of modules used in a playbook
*Failure/Success rate of modules
Expand All @@ -206,14 +231,18 @@ def base(self, dataframe):
* Number of jobs executed that use a specific partner collection - TODO - not implemented yet, must be communicated


dataframe corresponds to events joined with jobs
data is a dict with 'event_total' and 'task_summary' dataframe
"""

# Extract event_total and task_summary dataframe from the data structure
event_total = data.get('event_total', 0)
dataframe = data.get('task_summary', pd.DataFrame())

# TODO - ensure all columns are present in the dataframe, then let analysis run with empty data
if dataframe.empty:
return {
'json': {},
'rollup': {'aggregated': dataframe},
'json': {'event_total': event_total},
'rollup': {'aggregated': dataframe, 'event_total': event_total},
}

# Categorize columns to reduce memory footprint
Expand Down Expand Up @@ -265,7 +294,7 @@ def base(self, dataframe):
avg_number_of_modules_used_in_a_playbooks = dataframe.groupby('playbook', observed=True)['module_name'].nunique().mean()
modules_used_per_playbook_total = dataframe.groupby('playbook', observed=True)['module_name'].nunique()

total_hosts_automated = dataframe['host_id'].nunique()
hosts_automated_total = dataframe['host_id'].nunique()

# Data is already aggregated from prepare() and merge()
# We just need to compute the mutually exclusive task status categories
Expand Down Expand Up @@ -368,7 +397,8 @@ def base(self, dataframe):
# Prepare rollup data (dataframes before conversion)
rollup_data = {
'module_stats': module_stats,
'total_hosts_automated': {'total_hosts_automated': total_hosts_automated},
'hosts_automated_total': {'hosts_automated_total': hosts_automated_total},
'event_total': event_total,
}

# Prepare JSON data (converted to dicts/lists)
Expand All @@ -378,7 +408,8 @@ def base(self, dataframe):
'modules_used_per_playbook_total': modules_used_per_playbook_total.to_dict(),
'module_stats': merged_list_module,
'collection_name_stats': merged_list_collection_name,
'total_hosts_automated': total_hosts_automated,
'hosts_automated_total': hosts_automated_total,
'event_total': event_total,
}

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ def base(self, dataframe):
'rollup': {'aggregated': dataframe},
}

total_ee = int(len(dataframe))
EE_total = int(len(dataframe))
dataframe['managed'] = dataframe['managed'].map({'t': True, 'f': False})
default_ee = int(dataframe['managed'].sum())
custom_ee = total_ee - default_ee
EE_default_total = int(dataframe['managed'].sum())
EE_custom_total = EE_total - EE_default_total

# Prepare JSON data (same as rollup for scalar values)
json_data = {
'total_EE': total_ee,
'default_EE': default_ee,
'custom_EE': custom_ee,
'EE_total': EE_total,
'EE_default_total': EE_default_total,
'EE_custom_total': EE_custom_total,
}

# Prepare rollup data (raw values before conversion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pandas as pd

from metrics_utility.anonymized_rollups.base_anonymized_rollup import BaseAnonymizedRollup


Expand All @@ -10,6 +12,25 @@ def __init__(self):
super().__init__('job_host_summary')
self.collector_names = ['job_host_summary_service']

def merge(self, data_all, data_new):
"""
Override merge to handle the new structure with jobhostsummary_total and aggregated data.
Concatenates aggregated dataframes and sums jobhostsummary_total.
"""
# Handle initial empty DataFrame case (first iteration from load_anonymized_rollup_data)
if isinstance(data_all, pd.DataFrame) and data_all.empty:
return data_new

# Handle initial empty dict case
if isinstance(data_all, dict) and not data_all:
return data_new

# Concatenate aggregated dataframes and sum jobhostsummary_totals
return {
'jobhostsummary_total': data_all['jobhostsummary_total'] + data_new['jobhostsummary_total'],
'aggregated': pd.concat([data_all['aggregated'], data_new['aggregated']], ignore_index=True),
}

# prepare is called for each batch of data
# result of prepare is concatenated with other batches into one dataframe
# each dataframe in prepare should reduce the number of rows as much as possible
Expand All @@ -25,10 +46,16 @@ def __init__(self):
# rescued

def prepare(self, dataframe):
# Count all records before processing
jobhostsummary_total = len(dataframe)

# Aggregate by job_template_name and host_name to reduce data volume early
# This significantly improves performance when processing large batches
if dataframe.empty:
return dataframe
return {
'jobhostsummary_total': jobhostsummary_total,
'aggregated': dataframe,
}

# Group by job_template_name and host_name, sum task columns, count jobs
aggregated = (
Expand All @@ -46,23 +73,32 @@ def prepare(self, dataframe):
.reset_index()
)

return aggregated
return {
'jobhostsummary_total': jobhostsummary_total,
'aggregated': aggregated,
}

def base(self, dataframe):
def base(self, data):
"""
Avg tasks by template (column job_template_name)
Number of tasks executed (sum of all tasks executed in dataframe)
Success ratio of tasks executed (ratio between ok and failed tasks (and others))

Success rate and average - this can compute SaaS team from the metrics

data is a dict with 'jobhostsummary_total' and 'aggregated' dataframe
"""

# Extract jobhostsummary_total and aggregated dataframe from the data structure
jobhostsummary_total = data.get('jobhostsummary_total', 0)
dataframe = data.get('aggregated', pd.DataFrame())

# Return empty result if dataframe is empty
# TODO - ensure all columns are present in the dataframe, then let analysis run with empty data
if dataframe.empty:
return {
'json': [],
'rollup': {'aggregated': dataframe},
'json': {'jobhostsummary_total': jobhostsummary_total},
'rollup': {'aggregated': dataframe, 'jobhostsummary_total': jobhostsummary_total},
}

# Re-aggregate in case multiple batches had overlapping template+host combinations
Expand All @@ -80,22 +116,24 @@ def base(self, dataframe):
.reset_index()
)

total_unique_hosts = set().union(*aggregated['unique_hosts'])
unique_hosts_total = set().union(*aggregated['unique_hosts'])
# drop unique_hosts column
aggregated = aggregated.drop(columns=['unique_hosts'])

# Prepare rollup data (dataframe before conversion)
rollup_data = {
# pandas.DataFrame
'aggregated': aggregated,
'jobhostsummary_total': jobhostsummary_total,
}

# Prepare JSON data (converted to list of dicts)
# json_data = aggregated.to_dict(orient='records')

json_data = {
'total_unique_hosts': len(total_unique_hosts),
'unique_hosts_total': len(unique_hosts_total),
'aggregated': aggregated.to_dict(orient='records'),
'jobhostsummary_total': jobhostsummary_total,
}

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def test_events_modules_aggregations_basic():
'maintenance.yml': 1,
}

assert result['total_hosts_automated'] == 9
assert result['hosts_automated_total'] == 9

# collection stats assertions (current aggregation schema)
coll_by_name = {row['collection_name']: row for row in result['collection_name_stats']}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ def test_base_counts():
result = result['json']

# Expected values
assert result['total_EE'] == 5
assert result['default_EE'] == 2 # two True
assert result['custom_EE'] == 3 # total - default
assert result['EE_total'] == 5
assert result['EE_default_total'] == 2 # two True
assert result['EE_custom_total'] == 3 # total - default
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ def test_from_gather_to_json(cleanup_glob):
assert isinstance(statistics, dict), 'statistics should be a dictionary'
assert 'modules_used_to_automate_total' in statistics
assert 'avg_number_of_modules_used_in_a_playbooks' in statistics
assert 'total_hosts_automated' in statistics
assert 'total_EE' in statistics
assert 'default_EE' in statistics
assert 'custom_EE' in statistics
assert 'hosts_automated_total' in statistics
assert 'EE_total' in statistics
assert 'EE_default_total' in statistics
assert 'EE_custom_total' in statistics
assert 'jobs_total' in statistics
assert 'total_unique_hosts' in statistics
assert 'unique_hosts_total' in statistics

# Validate statistics data types
assert isinstance(statistics['modules_used_to_automate_total'], int)
assert isinstance(statistics['avg_number_of_modules_used_in_a_playbooks'], (int, float))
assert isinstance(statistics['total_hosts_automated'], int)
assert isinstance(statistics['total_EE'], int)
assert isinstance(statistics['default_EE'], int)
assert isinstance(statistics['custom_EE'], int)
assert isinstance(statistics['hosts_automated_total'], int)
assert isinstance(statistics['EE_total'], int)
assert isinstance(statistics['EE_default_total'], int)
assert isinstance(statistics['EE_custom_total'], int)
assert isinstance(statistics['jobs_total'], int)
assert isinstance(statistics['total_unique_hosts'], int)
assert isinstance(statistics['unique_hosts_total'], int)

# Validate arrays structure
assert isinstance(json_data['modules_used_per_playbook'], list), 'modules_used_per_playbook should be a list'
Expand Down Expand Up @@ -125,7 +125,7 @@ def test_from_gather_to_json(cleanup_glob):
# Validate statistics actual values
print('\n--- Validating statistics data values ---')
assert statistics['modules_used_to_automate_total'] == 1, 'Should have 1 module'
assert statistics['total_hosts_automated'] == 2, 'Should have 2 hosts automated'
assert statistics['hosts_automated_total'] == 2, 'Should have 2 hosts automated'
assert len(json_data['module_stats']) == 1, 'Should have 1 module stats'
assert len(json_data['collection_name_stats']) == 1, 'Should have 1 collection stats'

Expand Down Expand Up @@ -167,11 +167,11 @@ def test_from_gather_to_json(cleanup_glob):

# Validate execution_environments actual values
print('--- Validating execution_environments data values ---')
assert statistics['total_EE'] == 2, 'Should have 2 total execution environments'
assert statistics['default_EE'] == 1, 'Should have 1 default execution environment'
assert statistics['custom_EE'] == 1, 'Should have 1 custom execution environment'
assert statistics['EE_total'] == 2, 'Should have 2 total execution environments'
assert statistics['EE_default_total'] == 1, 'Should have 1 default execution environment'
assert statistics['EE_custom_total'] == 1, 'Should have 1 custom execution environment'
# Validate that total = default + custom
assert statistics['total_EE'] == statistics['default_EE'] + statistics['custom_EE'], 'Total EE should equal default + custom'
assert statistics['EE_total'] == statistics['EE_default_total'] + statistics['EE_custom_total'], 'Total EE should equal default + custom'

# Validate jobs actual values
print('--- Validating jobs data values ---')
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_from_gather_to_json(cleanup_glob):
job_host_summary = json_data['job_host_summary']
assert isinstance(job_host_summary, list), 'job_host_summary should be a list'
assert len(job_host_summary) == 1, 'Should have 1 job_host_summary entry'
assert statistics['total_unique_hosts'] == 2, 'Should have 2 unique hosts'
assert statistics['unique_hosts_total'] == 2, 'Should have 2 unique hosts'

jhs = job_host_summary[0]
assert 'job_template_name' in jhs
Expand All @@ -220,7 +220,7 @@ def test_from_gather_to_json(cleanup_glob):
print('--- Validating cross-section data consistency ---')
# Validate that module stats hosts match the total automated hosts
for module_stat in json_data['module_stats']:
assert module_stat['hosts_total'] <= statistics['total_hosts_automated'], (
assert module_stat['hosts_total'] <= statistics['hosts_automated_total'], (
f'Module {module_stat["module_name"][:50]} hosts should not exceed total automated hosts'
)

Expand Down
Loading