|
| 1 | +from qiita_core.util import MaxRSS_helper |
| 2 | +from qiita_db.software import Software |
| 3 | +import datetime |
| 4 | +from io import StringIO |
| 5 | +from subprocess import check_output |
| 6 | +import pandas as pd |
| 7 | +from os.path import join |
| 8 | + |
| 9 | +# This is an example script to collect the data we need from SLURM, the plan |
| 10 | +# is that in the near future we will clean up and add these to the Qiita's main |
| 11 | +# code and then have cronjobs to run them. |
| 12 | + |
| 13 | +# at time of writting we have: |
| 14 | +# qp-spades spades |
| 15 | +# (*) qp-woltka Woltka v0.1.4 |
| 16 | +# qp-woltka SynDNA Woltka |
| 17 | +# qp-woltka Calculate Cell Counts |
| 18 | +# (*) qp-meta Sortmerna v2.1b |
| 19 | +# (*) qp-fastp-minimap2 Adapter and host filtering v2023.12 |
| 20 | +# ... and the admin plugin |
| 21 | +# (*) qp-klp |
| 22 | +# Here we are only going to create summaries for (*) |
| 23 | + |
| 24 | + |
| 25 | +sacct = ['sacct', '-p', |
| 26 | + '--format=JobName,JobID,ElapsedRaw,MaxRSS,ReqMem', '-j'] |
| 27 | +# for the non admin jobs, we will use jobs from the last six months |
| 28 | +six_months = datetime.date.today() - datetime.timedelta(weeks=6*4) |
| 29 | + |
| 30 | +print('The current "sofware - commands" that use job-arrays are:') |
| 31 | +for s in Software.iter(): |
| 32 | + if 'ENVIRONMENT="' in s.environment_script: |
| 33 | + for c in s.commands: |
| 34 | + print(f"{s.name} - {c.name}") |
| 35 | + |
| 36 | +# 1. Command: woltka |
| 37 | + |
| 38 | +fn = join('/panfs', 'qiita', 'jobs_woltka.tsv.gz') |
| 39 | +print(f"Generating the summary for the woltka jobs: {fn}.") |
| 40 | + |
| 41 | +cmds = [c for s in Software.iter(False) |
| 42 | + if 'woltka' in s.name for c in s.commands] |
| 43 | +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and |
| 44 | + j.heartbeat.date() > six_months and j.input_artifacts] |
| 45 | + |
| 46 | +data = [] |
| 47 | +for j in jobs: |
| 48 | + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) |
| 49 | + jid, mjid = j.external_id.strip().split() |
| 50 | + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) |
| 51 | + _d = pd.read_csv(rvals, sep='|') |
| 52 | + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 53 | + else MaxRSS_helper(x)).max() |
| 54 | + jwt = _d.ElapsedRaw.max() |
| 55 | + |
| 56 | + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) |
| 57 | + _d = pd.read_csv(rvals, sep='|') |
| 58 | + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 59 | + else MaxRSS_helper(x)).max() |
| 60 | + mwt = _d.ElapsedRaw.max() |
| 61 | + |
| 62 | + data.append({ |
| 63 | + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main', |
| 64 | + 'db': j.parameters.values['Database'].split('/')[-1]}) |
| 65 | + data.append( |
| 66 | + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge', |
| 67 | + 'db': j.parameters.values['Database'].split('/')[-1]}) |
| 68 | +df = pd.DataFrame(data) |
| 69 | +df.to_csv(fn, sep='\t', index=False) |
| 70 | + |
| 71 | +# 2. qp-meta Sortmerna |
| 72 | + |
| 73 | +fn = join('/panfs', 'qiita', 'jobs_sortmerna.tsv.gz') |
| 74 | +print(f"Generating the summary for the woltka jobs: {fn}.") |
| 75 | + |
| 76 | +# for woltka we will only use jobs from the last 6 months |
| 77 | +cmds = [c for s in Software.iter(False) |
| 78 | + if 'minimap2' in s.name.lower() for c in s.commands] |
| 79 | +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and |
| 80 | + j.heartbeat.date() > six_months and j.input_artifacts] |
| 81 | + |
| 82 | +data = [] |
| 83 | +for j in jobs: |
| 84 | + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) |
| 85 | + jid, mjid = j.external_id.strip().split() |
| 86 | + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) |
| 87 | + _d = pd.read_csv(rvals, sep='|') |
| 88 | + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 89 | + else MaxRSS_helper(x)).max() |
| 90 | + jwt = _d.ElapsedRaw.max() |
| 91 | + |
| 92 | + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) |
| 93 | + _d = pd.read_csv(rvals, sep='|') |
| 94 | + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 95 | + else MaxRSS_helper(x)).max() |
| 96 | + mwt = _d.ElapsedRaw.max() |
| 97 | + |
| 98 | + data.append({ |
| 99 | + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'}) |
| 100 | + data.append( |
| 101 | + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'}) |
| 102 | +df = pd.DataFrame(data) |
| 103 | +df.to_csv(fn, sep='\t', index=False) |
| 104 | + |
| 105 | + |
| 106 | +# 3. Adapter and host filtering. Note that there is a new version deployed on |
| 107 | +# Jan 2024 so the current results will not be the most accurate |
| 108 | + |
| 109 | +fn = join('/panfs', 'qiita', 'jobs_adapter_host.tsv.gz') |
| 110 | +print(f"Generating the summary for the woltka jobs: {fn}.") |
| 111 | + |
| 112 | +# for woltka we will only use jobs from the last 6 months |
| 113 | +cmds = [c for s in Software.iter(False) |
| 114 | + if 'meta' in s.name.lower() for c in s.commands] |
| 115 | +jobs = [j for c in cmds if 'sortmerna' in c.name.lower() |
| 116 | + for j in c.processing_jobs if j.status == 'success' and |
| 117 | + j.heartbeat.date() > six_months and j.input_artifacts] |
| 118 | + |
| 119 | +data = [] |
| 120 | +for j in jobs: |
| 121 | + size = sum([fp['fp_size'] for fp in j.input_artifacts[0].filepaths]) |
| 122 | + jid, mjid = j.external_id.strip().split() |
| 123 | + rvals = StringIO(check_output(sacct + [jid]).decode('ascii')) |
| 124 | + _d = pd.read_csv(rvals, sep='|') |
| 125 | + jmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 126 | + else MaxRSS_helper(x)).max() |
| 127 | + jwt = _d.ElapsedRaw.max() |
| 128 | + |
| 129 | + rvals = StringIO(check_output(sacct + [mjid]).decode('ascii')) |
| 130 | + _d = pd.read_csv(rvals, sep='|') |
| 131 | + mmem = _d.MaxRSS.apply(lambda x: x if type(x) is not str |
| 132 | + else MaxRSS_helper(x)).max() |
| 133 | + mwt = _d.ElapsedRaw.max() |
| 134 | + |
| 135 | + data.append({ |
| 136 | + 'jid': j.id, 'sjid': jid, 'mem': jmem, 'wt': jwt, 'type': 'main'}) |
| 137 | + data.append( |
| 138 | + {'jid': j.id, 'sjid': mjid, 'mem': mmem, 'wt': mwt, 'type': 'merge'}) |
| 139 | +df = pd.DataFrame(data) |
| 140 | +df.to_csv(fn, sep='\t', index=False) |
| 141 | + |
| 142 | + |
| 143 | +# 4. The SPP! |
| 144 | + |
| 145 | +fn = join('/panfs', 'qiita', 'jobs_spp.tsv.gz') |
| 146 | +print(f"Generating the summary for the SPP jobs: {fn}.") |
| 147 | + |
| 148 | +# for the SPP we will look at jobs from the last year |
| 149 | +year = datetime.date.today() - datetime.timedelta(days=365) |
| 150 | +cmds = [c for s in Software.iter(False) |
| 151 | + if s.name == 'qp-klp' for c in s.commands] |
| 152 | +jobs = [j for c in cmds for j in c.processing_jobs if j.status == 'success' and |
| 153 | + j.heartbeat.date() > year] |
| 154 | + |
| 155 | +# for the SPP we need to find the jobs that were actually run, this means |
| 156 | +# looping throught the existing slurm jobs and finding them |
| 157 | +max_inter = 2000 |
| 158 | + |
| 159 | +data = [] |
| 160 | +for job in jobs: |
| 161 | + jei = int(job.external_id) |
| 162 | + rvals = StringIO( |
| 163 | + check_output(sacct + [str(jei)]).decode('ascii')) |
| 164 | + _d = pd.read_csv(rvals, sep='|') |
| 165 | + mem = _d.MaxRSS.apply( |
| 166 | + lambda x: x if type(x) is not str else MaxRSS_helper(x)).max() |
| 167 | + wt = _d.ElapsedRaw.max() |
| 168 | + # the current "easy" way to determine if amplicon or other is to check |
| 169 | + # the file extension of the filename |
| 170 | + stype = 'other' |
| 171 | + if job.parameters.values['sample_sheet']['filename'].endswith('.txt'): |
| 172 | + stype = 'amplicon' |
| 173 | + rid = job.parameters.values['run_identifier'] |
| 174 | + data.append( |
| 175 | + {'jid': job.id, 'sjid': jei, 'mem': mem, 'stype': stype, 'wt': wt, |
| 176 | + 'type': 'main', 'rid': rid, 'name': _d.JobName[0]}) |
| 177 | + |
| 178 | + # let's look for the convert job |
| 179 | + for jid in range(jei + 1, jei + max_inter): |
| 180 | + rvals = StringIO(check_output(sacct + [str(jid)]).decode('ascii')) |
| 181 | + _d = pd.read_csv(rvals, sep='|') |
| 182 | + if [1 for x in _d.JobName.values if x.startswith(job.id)]: |
| 183 | + cjid = int(_d.JobID[0]) |
| 184 | + mem = _d.MaxRSS.apply( |
| 185 | + lambda x: x if type(x) is not str else MaxRSS_helper(x)).max() |
| 186 | + wt = _d.ElapsedRaw.max() |
| 187 | + |
| 188 | + data.append( |
| 189 | + {'jid': job.id, 'sjid': cjid, 'mem': mem, 'stype': stype, |
| 190 | + 'wt': wt, 'type': 'convert', 'rid': rid, |
| 191 | + 'name': _d.JobName[0]}) |
| 192 | + |
| 193 | + # now let's look for the next step, if amplicon that's fastqc but |
| 194 | + # if other that's qc/nuqc |
| 195 | + for jid in range(cjid + 1, cjid + max_inter): |
| 196 | + rvals = StringIO( |
| 197 | + check_output(sacct + [str(jid)]).decode('ascii')) |
| 198 | + _d = pd.read_csv(rvals, sep='|') |
| 199 | + if [1 for x in _d.JobName.values if x.startswith(job.id)]: |
| 200 | + qc_jid = _d.JobIDRaw.apply( |
| 201 | + lambda x: int(x.split('.')[0])).max() |
| 202 | + qcmem = _d.MaxRSS.apply( |
| 203 | + lambda x: x if type(x) is not str |
| 204 | + else MaxRSS_helper(x)).max() |
| 205 | + qcwt = _d.ElapsedRaw.max() |
| 206 | + |
| 207 | + if stype == 'amplicon': |
| 208 | + data.append( |
| 209 | + {'jid': job.id, 'sjid': qc_jid, 'mem': qcmem, |
| 210 | + 'stype': stype, 'wt': qcwt, 'type': 'fastqc', |
| 211 | + 'rid': rid, 'name': _d.JobName[0]}) |
| 212 | + else: |
| 213 | + data.append( |
| 214 | + {'jid': job.id, 'sjid': qc_jid, 'mem': qcmem, |
| 215 | + 'stype': stype, 'wt': qcwt, 'type': 'qc', |
| 216 | + 'rid': rid, 'name': _d.JobName[0]}) |
| 217 | + for jid in range(qc_jid + 1, qc_jid + max_inter): |
| 218 | + rvals = StringIO(check_output( |
| 219 | + sacct + [str(jid)]).decode('ascii')) |
| 220 | + _d = pd.read_csv(rvals, sep='|') |
| 221 | + if [1 for x in _d.JobName.values if x.startswith( |
| 222 | + job.id)]: |
| 223 | + fqc_jid = _d.JobIDRaw.apply( |
| 224 | + lambda x: int(x.split('.')[0])).max() |
| 225 | + fqcmem = _d.MaxRSS.apply( |
| 226 | + lambda x: x if type(x) is not str |
| 227 | + else MaxRSS_helper(x)).max() |
| 228 | + fqcwt = _d.ElapsedRaw.max() |
| 229 | + data.append( |
| 230 | + {'jid': job.id, 'sjid': fqc_jid, |
| 231 | + 'mem': fqcmem, 'stype': stype, |
| 232 | + 'wt': fqcwt, 'type': 'fastqc', |
| 233 | + 'rid': rid, 'name': _d.JobName[0]}) |
| 234 | + break |
| 235 | + break |
| 236 | + break |
| 237 | + |
| 238 | +df = pd.DataFrame(data) |
| 239 | +df.to_csv(fn, sep='\t', index=False) |
0 commit comments