|
| 1 | + |
| 2 | +import os |
| 3 | +import yaml |
| 4 | +import pandas as pd |
| 5 | +import numpy as np |
| 6 | + |
| 7 | +from backend.helpers import check_empty_results, simulate_mock_jobs |
| 8 | +from backend.slurm_extract import WorkloadManager |
| 9 | + |
| 10 | +# print("Working dir1: ", os.getcwd()) # DEBUGONLY |
| 11 | + |
| 12 | +class GA_tools(): |
| 13 | + |
| 14 | + def __init__(self, cluster_info, fParams): |
| 15 | + self.cluster_info = cluster_info |
| 16 | + self.fParams = fParams |
| 17 | + |
| 18 | + def calculate_energies(self, row): |
| 19 | + ''' |
| 20 | + Calculate the energy usaged based on the job's paramaters |
| 21 | + :param row: [pd.Series] one row of usage statistics, corresponding to one job |
| 22 | + :return: [pd.Series] the same statistics with the energies added |
| 23 | + ''' |
| 24 | + ### CPU and GPU |
| 25 | + partition_info = self.cluster_info['partitions'][row.PartitionX] |
| 26 | + if row.PartitionTypeX == 'CPU': |
| 27 | + TDP2use4CPU = partition_info['TDP'] |
| 28 | + TDP2use4GPU = 0 |
| 29 | + else: |
| 30 | + TDP2use4CPU = partition_info['TDP_CPU'] |
| 31 | + TDP2use4GPU = partition_info['TDP'] |
| 32 | + |
| 33 | + row['energy_CPUs'] = row.TotalCPUtime2useX.total_seconds() / 3600 * TDP2use4CPU / 1000 # in kWh |
| 34 | + |
| 35 | + row['energy_GPUs'] = row.TotalGPUtime2useX.total_seconds() / 3600 * TDP2use4GPU / 1000 # in kWh |
| 36 | + |
| 37 | + ### memory |
| 38 | + for suffix, memory2use in zip(['','_memoryNeededOnly'], [row.ReqMemX,row.NeededMemX]): |
| 39 | + row[f'energy_memory{suffix}'] = row.WallclockTimeX.total_seconds()/3600 * memory2use * self.fParams['power_memory_perGB'] /1000 # in kWh |
| 40 | + row[f'energy{suffix}'] = (row.energy_CPUs + row.energy_GPUs + row[f'energy_memory{suffix}']) * self.cluster_info['PUE'] # in kWh |
| 41 | + |
| 42 | + return row |
| 43 | + |
| 44 | + def calculate_carbonFootprint(self, df, col_energy): |
| 45 | + return df[col_energy] * self.cluster_info['CI'] |
| 46 | + |
| 47 | + |
| 48 | +def extract_data(args, cluster_info): |
| 49 | + |
| 50 | + if args.use_mock_agg_data: # DEBUGONLY |
| 51 | + |
| 52 | + if args.reportBug | args.reportBugHere: |
| 53 | + print("\n(!) --reportBug and --reportBugHere are ignored when --useCustomLogs is present\n") |
| 54 | + |
| 55 | + # df2 = simulate_mock_jobs() |
| 56 | + # df2.to_pickle("testData/df_agg_X_mockMultiUsers_1.pkl") |
| 57 | + |
| 58 | + # foo = 'testData/df_agg_test_3.pkl' |
| 59 | + foo = 'testData/df_agg_X_1.pkl' |
| 60 | + print(f"Overriding df_agg with `{foo}`") |
| 61 | + return pd.read_pickle(foo) |
| 62 | + |
| 63 | + |
| 64 | + ### Pull usage statistics from the workload manager |
| 65 | + WM = WorkloadManager(args, cluster_info) |
| 66 | + WM.pull_logs() |
| 67 | + |
| 68 | + ### Log the output for debugging |
| 69 | + # TODO cleanup file/dir management here |
| 70 | + scripts_dir = os.path.dirname(os.path.realpath(__file__)) |
| 71 | + if args.reportBug | args.reportBugHere: |
| 72 | + |
| 73 | + # log_name = str(datetime.datetime.now().timestamp()).replace(".", "_") |
| 74 | + |
| 75 | + if args.reportBug: |
| 76 | + # Create an error_logs subfolder in the output dir |
| 77 | + errorLogsDir = os.path.join(args.outputDir2use['path'], 'error_logs') |
| 78 | + os.makedirs(errorLogsDir) |
| 79 | + log_path = os.path.join(errorLogsDir, f'sacctOutput.csv') |
| 80 | + else: |
| 81 | + # i.e. args.reportBugHere is True |
| 82 | + log_path = f"{args.userCWD}/sacctOutput_{args.outputDir2use['timestamp']}.csv" |
| 83 | + |
| 84 | + with open(log_path, 'wb') as f: |
| 85 | + f.write(WM.logs_raw) |
| 86 | + print(f"\nSLURM statistics logged for debuging: {log_path}\n") |
| 87 | + |
| 88 | + ### Turn usage logs into DataFrame |
| 89 | + WM.convert2dataframe() |
| 90 | + |
| 91 | + # And clean |
| 92 | + WM.clean_logs_df() |
| 93 | + # Check if there are any jobs during the period from this directory and with these jobIDs |
| 94 | + check_empty_results(WM.df_agg, args) |
| 95 | + |
| 96 | + # Check that there is only one user's data |
| 97 | + if len(set(WM.df_agg_X.UserX)) > 1: |
| 98 | + raise ValueError(f"More than one user's logs was included: {set(WM.df_agg_X.UserX)}") |
| 99 | + |
| 100 | + # WM.df_agg_X.to_pickle("testData/df_agg_X_1.pkl") # DEBUGONLY used to test different steps offline |
| 101 | + |
| 102 | + return WM.df_agg_X |
| 103 | + |
| 104 | +def enrich_data(df, fParams, GA): |
| 105 | + |
| 106 | + ### energy |
| 107 | + df = df.apply(GA.calculate_energies, axis=1) |
| 108 | + |
| 109 | + df['energy_failedJobs'] = np.where(df.StateX == 0, df.energy, 0) |
| 110 | + |
| 111 | + ### carbon footprint |
| 112 | + for suffix in ['', '_memoryNeededOnly', '_failedJobs']: |
| 113 | + df[f'carbonFootprint{suffix}'] = GA.calculate_carbonFootprint(df, f'energy{suffix}') |
| 114 | + # Context metrics (part 1) |
| 115 | + df[f'treeMonths{suffix}'] = df[f'carbonFootprint{suffix}'] / fParams['tree_month'] |
| 116 | + df[f'cost{suffix}'] = df[f'energy{suffix}'] * fParams['electricity_cost'] # TODO use realtime electricity costs |
| 117 | + |
| 118 | + ### Context metrics (part 2) |
| 119 | + df['driving'] = df.carbonFootprint / fParams['passengerCar_EU_perkm'] |
| 120 | + df['flying_NY_SF'] = df.carbonFootprint / fParams['flight_NY_SF'] |
| 121 | + df['flying_PAR_LON'] = df.carbonFootprint / fParams['flight_PAR_LON'] |
| 122 | + df['flying_NYC_MEL'] = df.carbonFootprint / fParams['flight_NYC_MEL'] |
| 123 | + |
| 124 | + return df |
| 125 | + |
| 126 | +def summarise_data(df, args): |
| 127 | + agg_functions_from_raw = { |
| 128 | + 'n_jobs': ('UserX', 'count'), |
| 129 | + 'first_job_period': ('SubmitDatetimeX', 'min'), |
| 130 | + 'last_job_period': ('SubmitDatetimeX', 'max'), |
| 131 | + 'energy': ('energy', 'sum'), |
| 132 | + 'energy_CPUs': ('energy_CPUs', 'sum'), |
| 133 | + 'energy_GPUs': ('energy_GPUs', 'sum'), |
| 134 | + 'energy_memory': ('energy_memory', 'sum'), |
| 135 | + 'carbonFootprint': ('carbonFootprint', 'sum'), |
| 136 | + 'carbonFootprint_memoryNeededOnly': ('carbonFootprint_memoryNeededOnly', 'sum'), |
| 137 | + 'carbonFootprint_failedJobs': ('carbonFootprint_failedJobs', 'sum'), |
| 138 | + 'cpuTime': ('TotalCPUtime2useX', 'sum'), |
| 139 | + 'gpuTime': ('TotalGPUtime2useX', 'sum'), |
| 140 | + 'wallclockTime': ('WallclockTimeX', 'sum'), |
| 141 | + 'CPUhoursCharged': ('CPUhoursChargedX', 'sum'), |
| 142 | + 'GPUhoursCharged': ('GPUhoursChargedX', 'sum'), |
| 143 | + 'memoryRequested': ('ReqMemX', 'sum'), |
| 144 | + 'memoryOverallocationFactor': ('memOverallocationFactorX', 'mean'), |
| 145 | + 'n_success': ('StateX', 'sum'), |
| 146 | + 'treeMonths': ('treeMonths', 'sum'), |
| 147 | + 'treeMonths_memoryNeededOnly': ('treeMonths_memoryNeededOnly', 'sum'), |
| 148 | + 'treeMonths_failedJobs': ('treeMonths_failedJobs', 'sum'), |
| 149 | + 'driving': ('driving', 'sum'), |
| 150 | + 'flying_NY_SF': ('flying_NY_SF', 'sum'), |
| 151 | + 'flying_PAR_LON': ('flying_PAR_LON', 'sum'), |
| 152 | + 'flying_NYC_MEL': ('flying_NYC_MEL', 'sum'), |
| 153 | + 'cost': ('cost', 'sum'), |
| 154 | + 'cost_failedJobs': ('cost_failedJobs', 'sum'), |
| 155 | + 'cost_memoryNeededOnly': ('cost_memoryNeededOnly', 'sum'), |
| 156 | + } |
| 157 | + |
| 158 | + # This is to aggregate already aggregated dataset (so names are a bit different) |
| 159 | + agg_functions_further = agg_functions_from_raw.copy() |
| 160 | + agg_functions_further['n_jobs'] = ('n_jobs', 'sum') |
| 161 | + agg_functions_further['first_job_period'] = ('first_job_period', 'min') |
| 162 | + agg_functions_further['last_job_period'] = ('last_job_period', 'max') |
| 163 | + agg_functions_further['cpuTime'] = ('cpuTime', 'sum') |
| 164 | + agg_functions_further['gpuTime'] = ('gpuTime', 'sum') |
| 165 | + agg_functions_further['wallclockTime'] = ('wallclockTime', 'sum') |
| 166 | + agg_functions_further['CPUhoursCharged'] = ('CPUhoursCharged', 'sum') |
| 167 | + agg_functions_further['GPUhoursCharged'] = ('GPUhoursCharged', 'sum') |
| 168 | + agg_functions_further['memoryRequested'] = ('memoryRequested', 'sum') |
| 169 | + agg_functions_further['memoryOverallocationFactor'] = ('memoryOverallocationFactor', 'mean') # NB: not strictly correct to do a mean of mean, but ok |
| 170 | + agg_functions_further['n_success'] = ('n_success', 'sum') |
| 171 | + |
| 172 | + def agg_jobs(data, agg_names=None): |
| 173 | + """ |
| 174 | +
|
| 175 | + :param data: |
| 176 | + :param agg_names: if None, then the whole dataset is aggregated |
| 177 | + :return: |
| 178 | + """ |
| 179 | + agg_names2 = agg_names if agg_names else lambda _:True |
| 180 | + if 'UserX' in data.columns: |
| 181 | + timeseries = data.groupby(agg_names2).agg(**agg_functions_from_raw) |
| 182 | + else: |
| 183 | + timeseries = data.groupby(agg_names2).agg(**agg_functions_further) |
| 184 | + |
| 185 | + timeseries.reset_index(inplace=True, drop=(agg_names is None)) |
| 186 | + timeseries['success_rate'] = timeseries.n_success / timeseries.n_jobs |
| 187 | + timeseries['failure_rate'] = 1 - timeseries.success_rate |
| 188 | + timeseries['share_carbonFootprint'] = timeseries.carbonFootprint / timeseries.carbonFootprint.sum() |
| 189 | + |
| 190 | + return timeseries |
| 191 | + |
| 192 | + df['SubmitDate'] = df.SubmitDatetimeX.dt.date # TODO do it with real start time rather than submit day |
| 193 | + |
| 194 | + df_userdaily = agg_jobs(df, ['SubmitDate']) |
| 195 | + df_overallStats = agg_jobs(df_userdaily) |
| 196 | + dict_overallStats = df_overallStats.iloc[0, :].to_dict() |
| 197 | + userID = df.UserX[0] |
| 198 | + |
| 199 | + output = { |
| 200 | + "userDaily": df_userdaily, |
| 201 | + 'userActivity': {userID: dict_overallStats}, |
| 202 | + "user": userID |
| 203 | + } |
| 204 | + |
| 205 | + # Some job-level statistics to plot distributions |
| 206 | + memoryOverallocationFactors = df.groupby('UserX')['memOverallocationFactorX'].apply(list).to_dict() |
| 207 | + memoryOverallocationFactors['overall'] = df.memOverallocationFactorX.to_numpy() |
| 208 | + output['memoryOverallocationFactors'] = memoryOverallocationFactors |
| 209 | + |
| 210 | + return output |
| 211 | + |
| 212 | + |
| 213 | +def main_backend(args): |
| 214 | + ''' |
| 215 | +
|
| 216 | + :param args: |
| 217 | + :return: |
| 218 | + ''' |
| 219 | + ### Load cluster specific info |
| 220 | + with open(os.path.join(args.path_infrastucture_info, 'cluster_info.yaml'), "r") as stream: |
| 221 | + try: |
| 222 | + cluster_info = yaml.safe_load(stream) |
| 223 | + except yaml.YAMLError as exc: |
| 224 | + print(exc) |
| 225 | + |
| 226 | + ### Load fixed parameters |
| 227 | + with open("data/fixed_parameters.yaml", "r") as stream: |
| 228 | + try: |
| 229 | + fParams = yaml.safe_load(stream) |
| 230 | + except yaml.YAMLError as exc: |
| 231 | + print(exc) |
| 232 | + |
| 233 | + GA = GA_tools(cluster_info, fParams) |
| 234 | + |
| 235 | + df = extract_data(args, cluster_info=cluster_info) |
| 236 | + df2 = enrich_data(df, fParams=fParams, GA=GA) |
| 237 | + summary_stats = summarise_data(df2, args=args) |
| 238 | + |
| 239 | + return summary_stats |
| 240 | + |
| 241 | +if __name__ == "__main__": |
| 242 | + |
| 243 | + #### This is used for testing only #### |
| 244 | + |
| 245 | + from collections import namedtuple |
| 246 | + argStruct = namedtuple('argStruct', |
| 247 | + 'startDay endDay use_mock_agg_data useCustomLogs customSuccessStates filterWD filterJobIDs filterAccount reportBug reportBugHere path_infrastucture_info') |
| 248 | + args = argStruct( |
| 249 | + startDay='2022-01-01', |
| 250 | + endDay='2023-06-30', |
| 251 | + useCustomLogs=None, |
| 252 | + use_mock_agg_data=True, |
| 253 | + customSuccessStates='', |
| 254 | + filterWD=None, |
| 255 | + filterJobIDs='all', |
| 256 | + filterAccount=None, |
| 257 | + reportBug=False, |
| 258 | + reportBugHere=False, |
| 259 | + path_infrastucture_info="clustersData/CSD3", |
| 260 | + ) |
| 261 | + |
| 262 | + main_backend(args) |
| 263 | + |
| 264 | + |
| 265 | + |
0 commit comments