Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions metrics_utility/anonymized_rollups/base_anonymized_rollup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
import pandas as pd

from metrics_utility.anonymized_rollups.helpers import sanitize_json
from metrics_utility.library.dataframes import BaseDataframe


class BaseAnonymizedRollup:
class BaseAnonymizedRollup(BaseDataframe):
def __init__(self, rollup_name: str):
super().__init__()

self.rollup_name = rollup_name
self.collector_names = []

def merge(self, dataframe_all, dataframe_new):
return pd.concat([dataframe_all, dataframe_new], ignore_index=True)

def rollup(self, dataframe_all, dataframe_new):
# not implemented in base class, return empty dataframe
return pd.DataFrame()
Expand Down
181 changes: 42 additions & 139 deletions metrics_utility/automation_controller_billing/dataframe_engine/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import datetime

from functools import reduce

import pandas as pd

from dateutil.relativedelta import relativedelta


Expand Down Expand Up @@ -39,70 +35,59 @@
return dates_arr


# For JSON/dict columns: update one dict with the other (later values overwrite earlier ones)
def combine_json(json1, json2):
merged = {}
if isinstance(json1, dict):
merged.update(json1)
if isinstance(json2, dict):
merged.update(json2)
return merged


# For set columns: take the union of the two sets
def combine_set(set1, set2):
"""
Combine two collections (set or list) into a single set of unique items.
If an input is a list, it is first converted to a set.
If an input is not a list or a set, it is treated as empty.
"""
# Convert to set if input is a list; otherwise, if not a set, default to an empty set.
if isinstance(set1, list):
set1 = set(set1)
elif not isinstance(set1, set):
set1 = set()

if isinstance(set2, list):
set2 = set(set2)
elif not isinstance(set2, set):
set2 = set()
class Base:
def __init__(self, extractor, month, extra_params, klass):
self.extractor = extractor
self.month = month
self.extra_params = extra_params
self.klass = klass

# Return the union of both sets.
return set1.union(set2)
def build_dataframe(self):
o = self.klass()
o.from_tarballs(self.iter_batches(o.TARBALL_NAMES))
if o.rollup is not None:
return o.rollup
return o.empty()

def dedup(self, dataframe, hostname_mapping=None, scope_dataframe=None):
return self.klass().dedup(dataframe, hostname_mapping=hostname_mapping, scope_dataframe=scope_dataframe, deduplicator=self.extra_params.get('deduplicator'))

Check failure on line 53 in metrics_utility/automation_controller_billing/dataframe_engine/base.py

View workflow job for this annotation

GitHub Actions / pr-checks

Ruff (E501)

metrics_utility/automation_controller_billing/dataframe_engine/base.py:53:151: E501 Line too long (164 > 150)

def merge_sets(x):
return set().union(*x)
def iter_batches(self, names):
collections = []
optional = []
datas = map(lambda x: x.replace('.csv', '').replace('.json', ''), names)
names = [*names]

if 'config.json' in names:
optional.append('config')
names.remove('config.json')
if 'data_collection_status.csv' in names:
optional.append('data_collection_status')
names.remove('data_collection_status.csv')

def merge_setdicts(x):
return reduce(combine_json_values, x, {})
collections = list(map(lambda x: x.replace('.csv', ''), names))
if len(collections) == 0:
collections = None

for date in self.dates():
for data in self.extractor.iter_batches(date=date, collections=collections, optional=optional):
tup = tuple()
nonempty = 0

# Helper function to combine two JSON values.
# For each key, it builds a set of non-null, non-empty values from both inputs.
def combine_json_values(val1, val2):
merged = {}
for d in [val1, val2]:
if isinstance(d, dict):
for key, value in d.items():
if value is not None and value != '':
if isinstance(value, set):
merged.setdefault(key, set()).update(value)
else:
merged.setdefault(key, set()).add(value)
for name in datas:
batch = data[name]
tup = (*tup, batch)

return merged
if name != 'config' and not batch.empty:
nonempty += 1

if nonempty < 1:
continue

class Base:
def __init__(self, extractor, month, extra_params):
self.extractor = extractor
self.month = month
self.extra_params = extra_params
if len(tup) == 1:
tup = tup[0]

def build_dataframe(self):
pass
yield tup

def dates(self):
if self.extra_params.get('since_date') is not None:
Expand All @@ -114,85 +99,3 @@

dates_list = list_dates(start_date=beginning_of_the_month, end_date=end_of_the_month, granularity='daily')
return dates_list

def cast_dataframe(self, df, types):
levels = []
if len(self.unique_index_columns()) == 1:
# Special behavior if the index is not composite, but only 1 column
# Casting index field to object
df.index = df.index.astype(object)
else:
# Composite index branch
# Casting index field to object
for index, _level in enumerate(df.index.levels):
casted_level = df.index.levels[index].astype(object)
levels.append(casted_level)

df.index = df.index.set_levels(levels)

return df.astype(types)

def summarize_merged_dataframes(self, df, columns, operations={}):
for col in columns:
if operations.get(col) == 'min':
df[col] = df[[f'{col}_x', f'{col}_y']].min(axis=1)
elif operations.get(col) == 'max':
df[col] = df[[f'{col}_x', f'{col}_y']].max(axis=1)
elif operations.get(col) == 'combine_set':
df[col] = df.apply(lambda row: combine_set(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
elif operations.get(col) == 'combine_json':
df[col] = df.apply(lambda row: combine_json(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
elif operations.get(col) == 'combine_json_values':
df[col] = df.apply(lambda row: combine_json_values(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
else:
df[col] = df[[f'{col}_x', f'{col}_y']].sum(axis=1)
del df[f'{col}_x']
del df[f'{col}_y']
return df

def empty(self):
return pd.DataFrame(columns=self.unique_index_columns() + self.data_columns())

# Multipart collection, merge the dataframes and sum counts
def merge(self, rollup, new_group):
if rollup is None:
return new_group

rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')
rollup = self.summarize_merged_dataframes(rollup, self.data_columns(), operations=self.operations())
return self.cast_dataframe(rollup, self.cast_types())

def dedup(self, dataframe, hostname_mapping=None):
if dataframe is None or dataframe.empty:
return self.empty()

if not hostname_mapping:
return dataframe

# map hostnames to canonical value
df = dataframe.copy()

df['host_name'] = df['host_name'].map(hostname_mapping).fillna(df['host_name'])

# multiple rows can now have the same hostname, regroup
df_grouped = self.regroup(df)

# cast types to match the table
df_grouped = self.cast_dataframe(df_grouped, self.cast_types())
return df_grouped.reset_index()

@staticmethod
def unique_index_columns():
pass

@staticmethod
def data_columns():
pass

@staticmethod
def cast_types():
pass

@staticmethod
def operations():
pass

This file was deleted.

Loading
Loading