-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcompute_intersection.py
More file actions
63 lines (47 loc) · 1.85 KB
/
compute_intersection.py
File metadata and controls
63 lines (47 loc) · 1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import dask.dataframe as dd
import numpy as np
import pandas as pd
import os
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
import gc
import signal
# hard code args for testing
# args={}
# args['1']='/lambda_stor/homes/brettin/covid19/ML-models/3CLpro.reg.top1.csv'
# args['2']='/lambda_stor/homes/brettin/covid19/ML-models/Enamine_Infer_3CLpro.bin.top1.csv'
# args['out'] = 'intersection.top1'
import argparse
psr = argparse.ArgumentParser(description='inferencing on descriptors')
psr.add_argument('--1', default='file1')
psr.add_argument('--2', default='file2')
psr.add_argument('--out', default='intersection.csv')
args=vars(psr.parse_args())
logging.info(str((args)))
logging.info("processing {} and {}".format(args['1'], args['2']))
# extra args to set up headers
#kwargs = {'names' : [0, 1, 2] }
kwargs = {}
# parallel read on csv files
logging.info('reading csv files')
df1 = dd.read_csv(args['1'], **kwargs)
df2 = dd.read_csv(args['2'], **kwargs)
logging.info("done reading csv files {} {}".format(args['1'], args['2']))
logging.info('turn it into a pandas dataframe')
df1=df1.compute()
df2=df2.compute()
logging.info("{:,} rows with {:,} elements".format(df1.shape[0], df1.shape[1]))
logging.info("{:,} rows with {:,} elements".format(df2.shape[0], df2.shape[1]))
logging.info("computing intersection on {:,} and {:,} samples".format(df1.shape[0], df2.shape[0]))
s1 = pd.merge(df1, df2, how='inner', on=['2'])
logging.info("done computing intersection resulting in {:,} samples".format(s1.shape[0]))
logging.info ('writing csv')
s1.to_csv(args['out'])
logging.info('done writing csv')
# for some reason, it takes forever for the program
# to exit, am assuming it garbage collection, but can't prove it yet.
# gc.set_threshold(0)
# os.kill(os.getpid(), signal.SIGTERM)