Skip to content

Commit 470d246

Browse files
committed
Resource Allocation Insertion into DB
- Added functionality of updating resource allocation data into db in qiita_db/util.py - Added tests for the added functionality in qiita_db/test/test_util.py - Moved MaxRSS_helper function from qiita_core/util.py to qiita_db/util.py. - Moved MaxRSS_helper test from qiita_core/tests/test_util.py to qiita_db/test/test_util.py
1 parent 4fc8735 commit 470d246

File tree

6 files changed

+294
-29
lines changed

6 files changed

+294
-29
lines changed

notebooks/resource-allocation/generate-allocation-summary-arrays.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from qiita_core.util import MaxRSS_helper
1+
from qiita_db.util import MaxRSS_helper
22
from qiita_db.software import Software
33
import datetime
44
from io import StringIO

notebooks/resource-allocation/generate-allocation-summary.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from json import loads
66
from os.path import join
77

8-
from qiita_core.util import MaxRSS_helper
8+
from qiita_db.util import MaxRSS_helper
99
from qiita_db.exceptions import QiitaDBUnknownIDError
1010
from qiita_db.processing_job import ProcessingJob
1111
from qiita_db.software import Software

qiita_core/tests/test_util.py

+1-15
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from qiita_core.util import (
1212
qiita_test_checker, execute_as_transaction, get_qiita_version,
13-
is_test_environment, get_release_info, MaxRSS_helper)
13+
is_test_environment, get_release_info)
1414
from qiita_db.meta_util import (
1515
generate_biom_and_metadata_release, generate_plugin_releases)
1616
import qiita_db as qdb
@@ -82,20 +82,6 @@ def test_get_release_info(self):
8282
self.assertEqual(biom_metadata_release, ('', '', ''))
8383
self.assertNotEqual(archive_release, ('', '', ''))
8484

85-
def test_MaxRSS_helper(self):
86-
tests = [
87-
('6', 6.0),
88-
('6K', 6000),
89-
('6M', 6000000),
90-
('6G', 6000000000),
91-
('6.9', 6.9),
92-
('6.9K', 6900),
93-
('6.9M', 6900000),
94-
('6.9G', 6900000000),
95-
]
96-
for x, y in tests:
97-
self.assertEqual(MaxRSS_helper(x), y)
98-
9985

10086
if __name__ == '__main__':
10187
main()

qiita_core/util.py

-12
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,3 @@ def get_release_info(study_status='public'):
151151
archive_release = ((md5sum, filepath, timestamp))
152152

153153
return (biom_metadata_release, archive_release)
154-
155-
156-
def MaxRSS_helper(x):
157-
if x[-1] == 'K':
158-
y = float(x[:-1]) * 1000
159-
elif x[-1] == 'M':
160-
y = float(x[:-1]) * 1000000
161-
elif x[-1] == 'G':
162-
y = float(x[:-1]) * 1000000000
163-
else:
164-
y = float(x)
165-
return y

qiita_db/test/test_util.py

+73
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,79 @@ def test_minimize_const(self):
13681368
doesn't match""")
13691369
self.assertEqual(failures, 1, "Number of failures must be 1")
13701370

1371+
def test_MaxRSS_helper(self):
1372+
tests = [
1373+
('6', 6.0),
1374+
('6K', 6000),
1375+
('6M', 6000000),
1376+
('6G', 6000000000),
1377+
('6.9', 6.9),
1378+
('6.9K', 6900),
1379+
('6.9M', 6900000),
1380+
('6.9G', 6900000000),
1381+
]
1382+
for x, y in tests:
1383+
self.assertEqual(qdb.util.MaxRSS_helper(x), y)
1384+
1385+
def test_db_update(self):
1386+
1387+
def read_slurm(id):
1388+
sacct_dummies = {
1389+
1005932: (
1390+
"JobID|ElapsedRaw|MaxRSS|Submit|Start|MaxRSS|CPUTimeRAW|"
1391+
"ReqMem|AllocCPUS|AveVMSize|\n"
1392+
"1005932|165||2023-02-23T14:55:07|2023-02-23T14:55:08|"
1393+
"|165|120Gn|1||\n"
1394+
"1005932.batch|165|328716K|2023-02-23T14:55:08"
1395+
"|2023-02-23T14:55:08|328716K|"
1396+
"165|120Gn|1|156284K|\n"
1397+
"1005932.extern|165|0|2023-02-23T14:55:08|"
1398+
"2023-02-23T14:55:08|0|165|120Gn|1|108052K|"
1399+
),
1400+
1001100: (
1401+
"JobID|ElapsedRaw|MaxRSS|Submit|Start|MaxRSS|"
1402+
"CPUTimeRAW|ReqMem|AllocCPUS|AveVMSize|\n"
1403+
"1001100|219||2023-02-22T11:05:26|"
1404+
"2023-02-22T11:05:27||219|120Gn|1||\n"
1405+
"1001100.batch|219|342204K|2023-02-22T11:05:27|"
1406+
"2023-02-22T11:05:27|342204K|219|120Gn|1|156284K|\n"
1407+
"1001100.extern|219|0|2023-02-22T11:05:27|"
1408+
"2023-02-22T11:05:27|0|219|120Gn|1|108052K|"
1409+
)
1410+
}
1411+
1412+
slurm_info = sacct_dummies.get(id)
1413+
return slurm_info
1414+
1415+
types = {
1416+
'Split libraries FASTQ': [
1417+
'6d368e16-2242-4cf8-87b4-a5dc40bb890b',
1418+
'4c7115e8-4c8e-424c-bf25-96c292ca1931',
1419+
'b72369f9-a886-4193-8d3d-f7b504168e75',
1420+
'46b76f74-e100-47aa-9bf2-c0208bcea52d',
1421+
'6ad4d590-4fa3-44d3-9a8f-ddbb472b1b5f'],
1422+
'Pick closed-reference OTUs': [
1423+
'3c9991ab-6c14-4368-a48c-841e8837a79c',
1424+
'80bf25f3-5f1d-4e10-9369-315e4244f6d5',
1425+
'9ba5ae7a-41e1-4202-b396-0259aeaac366',
1426+
'e5609746-a985-41a1-babf-6b3ebe9eb5a9',
1427+
],
1428+
'Single Rarefaction': [
1429+
'8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0'
1430+
]
1431+
}
1432+
1433+
qdb.util.update_resource_allocation_table(test=read_slurm)
1434+
1435+
for curr_cname, ids in types.items():
1436+
updated_df = qdb.util._retrieve_resource_data(
1437+
curr_cname, self.SNAME, self.columns)
1438+
updated_ids_set = set(updated_df['processing_job_id'])
1439+
previous_ids_set = set(self.df['processing_job_id'])
1440+
for id in ids:
1441+
self.assertTrue(id in updated_ids_set)
1442+
self.assertFalse(id in previous_ids_set)
1443+
13711444

13721445
STUDY_INFO = {
13731446
'study_id': 1,

qiita_db/util.py

+218
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,20 @@
6868
from errno import EEXIST
6969
from qiita_core.exceptions import IncompetentQiitaDeveloperError
7070
from qiita_core.qiita_settings import qiita_config
71+
from subprocess import check_output
7172
import qiita_db as qdb
7273

74+
7375
from email.mime.multipart import MIMEMultipart
7476
from email.mime.text import MIMEText
7577

7678
from datetime import timedelta
7779
import matplotlib.pyplot as plt
7880
import numpy as np
7981
import pandas as pd
82+
from io import StringIO
83+
from json import loads
84+
from random import choice
8085
from scipy.optimize import minimize
8186

8287
# memory constant functions defined for @resource_allocation_plot
@@ -2673,3 +2678,216 @@ def _resource_allocation_failures(df, k, a, b, model, col_name, type_):
26732678
df[f'c{type_}'] = model(x_plot, k, a, b)
26742679
failures_df = df[df[type_] > df[f'c{type_}']]
26752680
return failures_df
2681+
2682+
2683+
def MaxRSS_helper(x):
2684+
if x[-1] == 'K':
2685+
y = float(x[:-1]) * 1000
2686+
elif x[-1] == 'M':
2687+
y = float(x[:-1]) * 1000000
2688+
elif x[-1] == 'G':
2689+
y = float(x[:-1]) * 1000000000
2690+
else:
2691+
y = float(x)
2692+
return y
2693+
2694+
2695+
def update_resource_allocation_table(test=None):
2696+
# Thu, Apr 27, 2023 old allocations (from barnacle) were changed to a
2697+
# better allocation so using job 1265533 as the before/after so we only
2698+
# use the latests for the newest version
2699+
df = pd.DataFrame()
2700+
2701+
sql_command = """
2702+
SELECT
2703+
pj.processing_job_id AS processing_job_id
2704+
FROM
2705+
qiita.software_command sc
2706+
JOIN
2707+
qiita.processing_job pj ON pj.command_id = sc.command_id
2708+
JOIN
2709+
qiita.processing_job_status pjs
2710+
ON pj.processing_job_status_id = pjs.processing_job_status_id
2711+
LEFT JOIN
2712+
qiita.slurm_resource_allocations sra
2713+
ON pj.processing_job_id = sra.processing_job_id
2714+
WHERE
2715+
pjs.processing_job_status = 'success'
2716+
AND
2717+
pj.external_job_id ~ '^[0-9]+$'
2718+
AND
2719+
pj.external_job_id::INT >= 1265533
2720+
AND
2721+
sra.processing_job_id IS NULL;
2722+
""" if test is None else """
2723+
SELECT
2724+
pj.processing_job_id AS processing_job_id
2725+
FROM
2726+
qiita.software_command sc
2727+
JOIN
2728+
qiita.processing_job pj ON pj.command_id = sc.command_id
2729+
JOIN
2730+
qiita.processing_job_status pjs
2731+
ON pj.processing_job_status_id = pjs.processing_job_status_id
2732+
LEFT JOIN
2733+
qiita.slurm_resource_allocations sra
2734+
ON pj.processing_job_id = sra.processing_job_id
2735+
WHERE
2736+
pjs.processing_job_status = 'success'
2737+
AND
2738+
sra.processing_job_id IS NULL;
2739+
"""
2740+
2741+
with qdb.sql_connection.TRN:
2742+
sql = sql_command
2743+
qdb.sql_connection.TRN.add(sql)
2744+
res = qdb.sql_connection.TRN.execute_fetchindex()
2745+
columns = ["processing_job_id"]
2746+
df = pd.DataFrame(res, columns=columns)
2747+
2748+
data = []
2749+
sacct = ['sacct', '-p', '--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,'
2750+
'CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize', '-j']
2751+
2752+
for index, row in df.iterrows():
2753+
job = qdb.processing_job.ProcessingJob(row['processing_job_id'])
2754+
extra_info = ''
2755+
eid = job.external_id
2756+
if test is not None:
2757+
eid = choice([1005932, 1001100])
2758+
rvals = test(eid)
2759+
else:
2760+
rvals = StringIO(check_output(sacct + [eid]).decode('ascii'))
2761+
try:
2762+
rvals = StringIO(check_output(sacct + [eid]).decode('ascii'))
2763+
except TypeError as e:
2764+
raise e
2765+
2766+
_d = pd.read_csv(StringIO(rvals), sep='|')
2767+
2768+
_d['processing_job_id'] = job.id
2769+
_d['external_id'] = eid
2770+
2771+
cmd = job.command
2772+
s = job.command.software
2773+
try:
2774+
samples, columns, input_size = job.shape
2775+
except qdb.exceptions.QiitaDBUnknownIDError:
2776+
# this will be raised if the study or the analysis has been
2777+
# deleted; in other words, the processing_job was ran but the
2778+
# details about it were erased when the user deleted them -
2779+
# however, we keep the job for the record
2780+
continue
2781+
except TypeError as e:
2782+
# similar to the except above, exept that for these 2 commands, we
2783+
# have the study_id as None
2784+
if cmd.name in {'create_sample_template', 'delete_sample_template',
2785+
'list_remote_files'}:
2786+
continue
2787+
else:
2788+
raise e
2789+
sname = s.name
2790+
2791+
if cmd.name == 'release_validators':
2792+
ej = qdb.processing_job.ProcessingJob(job.parameters.values['job'])
2793+
extra_info = ej.command.name
2794+
samples, columns, input_size = ej.shape
2795+
elif cmd.name == 'complete_job':
2796+
artifacts = loads(job.parameters.values['payload'])['artifacts']
2797+
if artifacts is not None:
2798+
extra_info = ','.join({
2799+
x['artifact_type'] for x in artifacts.values()
2800+
if 'artifact_type' in x})
2801+
elif cmd.name == 'Validate':
2802+
input_size = sum([len(x) for x in loads(
2803+
job.parameters.values['files']).values()])
2804+
sname = f"{sname} - {job.parameters.values['artifact_type']}"
2805+
elif cmd.name == 'Alpha rarefaction curves [alpha_rarefaction]':
2806+
extra_info = job.parameters.values[
2807+
('The number of rarefaction depths to include between '
2808+
'min_depth and max_depth. (steps)')]
2809+
2810+
# In slurm, each JobID is represented by 3 rows in the dataframe:
2811+
# - external_id: overall container for the job and its associated
2812+
# requests. When the Timelimit is hit, the container
2813+
# would take care of completing/stopping the
2814+
# external_id.batch job.
2815+
# - external_id.batch: it's a container job, it provides how
2816+
# much memory it uses and cpus allocated, etc.
2817+
# - external_id.extern: takes into account anything that happens
2818+
# outside processing but yet is included in
2819+
# the container resources. As in, if you ssh
2820+
# to the node and do something additional or run
2821+
# a prolog script, that processing would be under
2822+
# external_id but separate from external_id.batch
2823+
# Here we are going to merge all this info into a single row + some
2824+
# other columns
2825+
2826+
def merge_rows(rows):
2827+
date_fmt = '%Y-%m-%dT%H:%M:%S'
2828+
wait_time = (
2829+
datetime.strptime(rows.iloc[0]['Start'], date_fmt)
2830+
- datetime.strptime(rows.iloc[0]['Submit'], date_fmt))
2831+
tmp = rows.iloc[1].copy()
2832+
tmp['WaitTime'] = wait_time
2833+
return tmp
2834+
2835+
curr = _d.groupby(
2836+
'external_id').apply(merge_rows).reset_index(drop=True)
2837+
2838+
row_dict = {
2839+
'processing_job_id': job.id,
2840+
'external_id': eid,
2841+
'sId': s.id,
2842+
'sName': sname,
2843+
'sVersion': s.version,
2844+
'cId': cmd.id,
2845+
'cName': cmd.name,
2846+
'samples': samples,
2847+
'columns': columns,
2848+
'input_size': input_size,
2849+
'extra_info': extra_info,
2850+
'ElapsedRaw': curr['ElapsedRaw'].iloc[0],
2851+
'MaxRSS': curr['MaxRSS'].iloc[0],
2852+
'Submit': curr['Submit'].iloc[0],
2853+
'Start': curr['Start'].iloc[0],
2854+
'WaitTime': curr['WaitTime'].iloc[0],
2855+
'CPUTimeRAW': curr['CPUTimeRAW'].iloc[0],
2856+
'ReqMem': curr['ReqMem'].iloc[0],
2857+
'AllocCPUS': curr['AllocCPUS'].iloc[0],
2858+
'AveVMSize': curr['AveVMSize'].iloc[0]
2859+
}
2860+
2861+
data.append(row_dict)
2862+
df = pd.DataFrame(data)
2863+
2864+
# This is important as we are transforming the MaxRSS to raw value
2865+
# so we need to confirm that there is no other suffixes
2866+
print('Make sure that only 0/K/M exist', set(
2867+
df.MaxRSS.apply(lambda x: str(x)[-1])))
2868+
2869+
# Generating new columns
2870+
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
2871+
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
2872+
lambda x: timedelta(seconds=float(x)))
2873+
2874+
for index, row in df.iterrows():
2875+
with qdb.sql_connection.TRN:
2876+
sql = """
2877+
INSERT INTO qiita.slurm_resource_allocations (
2878+
processing_job_id,
2879+
samples,
2880+
columns,
2881+
input_size,
2882+
extra_info,
2883+
memory_used,
2884+
walltime_used
2885+
)
2886+
VALUES (%s, %s, %s, %s, %s, %s, %s)
2887+
"""
2888+
to_insert = [
2889+
row['processing_job_id'], row['samples'], row['columns'],
2890+
row['input_size'], row['extra_info'], row['MaxRSSRaw'],
2891+
row['ElapsedRaw']]
2892+
qdb.sql_connection.TRN.add(sql, sql_args=to_insert)
2893+
qdb.sql_connection.TRN.execute()

0 commit comments

Comments
 (0)