11import datetime
22
3- from functools import reduce
4-
5- import pandas as pd
6-
73from dateutil .relativedelta import relativedelta
84
95
@@ -39,70 +35,59 @@ def list_dates(start_date, end_date, granularity):
3935 return dates_arr
4036
4137
42- # For JSON/dict columns: update one dict with the other (later values overwrite earlier ones)
43- def combine_json (json1 , json2 ):
44- merged = {}
45- if isinstance (json1 , dict ):
46- merged .update (json1 )
47- if isinstance (json2 , dict ):
48- merged .update (json2 )
49- return merged
50-
51-
52- # For set columns: take the union of the two sets
53- def combine_set (set1 , set2 ):
54- """
55- Combine two collections (set or list) into a single set of unique items.
56- If an input is a list, it is first converted to a set.
57- If an input is not a list or a set, it is treated as empty.
58- """
59- # Convert to set if input is a list; otherwise, if not a set, default to an empty set.
60- if isinstance (set1 , list ):
61- set1 = set (set1 )
62- elif not isinstance (set1 , set ):
63- set1 = set ()
64-
65- if isinstance (set2 , list ):
66- set2 = set (set2 )
67- elif not isinstance (set2 , set ):
68- set2 = set ()
38+ class Base :
39+ def __init__ (self , extractor , month , extra_params , klass ):
40+ self .extractor = extractor
41+ self .month = month
42+ self .extra_params = extra_params
43+ self .klass = klass
6944
70- # Return the union of both sets.
71- return set1 .union (set2 )
45+ def build_dataframe (self ):
46+ o = self .klass ()
47+ o .from_tarballs (self .iter_batches (o .TARBALL_NAMES ))
48+ if o .rollup is not None :
49+ return o .rollup
50+ return o .empty ()
7251
52+ def dedup (self , dataframe , hostname_mapping = None , scope_dataframe = None ):
53+ return self .klass ().dedup (dataframe , hostname_mapping = hostname_mapping , scope_dataframe = scope_dataframe )
7354
74- def merge_sets (x ):
75- return set ().union (* x )
55+ def iter_batches (self , names ):
56+ collections = []
57+ optional = []
58+ datas = map (lambda x : x .replace ('.csv' , '' ).replace ('.json' , '' ), names )
59+ names = [* names ]
7660
61+ if 'config.json' in names :
62+ optional .append ('config' )
63+ names .remove ('config.json' )
64+ if 'data_collection_status.csv' in names :
65+ optional .append ('data_collection_status' )
66+ names .remove ('data_collection_status.csv' )
7767
78- def merge_setdicts (x ):
79- return reduce (combine_json_values , x , {})
68+ collections = list (map (lambda x : x .replace ('.csv' , '' ), names ))
69+ if len (collections ) == 0 :
70+ collections = None
8071
72+ for date in self .dates ():
73+ for data in self .extractor .iter_batches (date = date , collections = collections , optional = optional ):
74+ tup = tuple ()
75+ nonempty = 0
8176
82- # Helper function to combine two JSON values.
83- # For each key, it builds a set of non-null, non-empty values from both inputs.
84- def combine_json_values (val1 , val2 ):
85- merged = {}
86- for d in [val1 , val2 ]:
87- if isinstance (d , dict ):
88- for key , value in d .items ():
89- if value is not None and value != '' :
90- if isinstance (value , set ):
91- merged .setdefault (key , set ()).update (value )
92- else :
93- merged .setdefault (key , set ()).add (value )
77+ for name in datas :
78+ batch = data [name ]
79+ tup = (* tup , batch )
9480
95- return merged
81+ if name != 'config' and not batch .empty :
82+ nonempty += 1
9683
84+ if nonempty < 1 :
85+ continue
9786
98- class Base :
99- def __init__ (self , extractor , month , extra_params ):
100- self .extractor = extractor
101- self .month = month
102- self .extra_params = extra_params
87+ if len (tup ) == 1 :
88+ tup = tup [0 ]
10389
104- def build_dataframe (self ):
105- pass
90+ yield tup
10691
10792 def dates (self ):
10893 if self .extra_params .get ('since_date' ) is not None :
@@ -114,85 +99,3 @@ def dates(self):
11499
115100 dates_list = list_dates (start_date = beginning_of_the_month , end_date = end_of_the_month , granularity = 'daily' )
116101 return dates_list
117-
118- def cast_dataframe (self , df , types ):
119- levels = []
120- if len (self .unique_index_columns ()) == 1 :
121- # Special behavior if the index is not composite, but only 1 column
122- # Casting index field to object
123- df .index = df .index .astype (object )
124- else :
125- # Composite index branch
126- # Casting index field to object
127- for index , _level in enumerate (df .index .levels ):
128- casted_level = df .index .levels [index ].astype (object )
129- levels .append (casted_level )
130-
131- df .index = df .index .set_levels (levels )
132-
133- return df .astype (types )
134-
135- def summarize_merged_dataframes (self , df , columns , operations = {}):
136- for col in columns :
137- if operations .get (col ) == 'min' :
138- df [col ] = df [[f'{ col } _x' , f'{ col } _y' ]].min (axis = 1 )
139- elif operations .get (col ) == 'max' :
140- df [col ] = df [[f'{ col } _x' , f'{ col } _y' ]].max (axis = 1 )
141- elif operations .get (col ) == 'combine_set' :
142- df [col ] = df .apply (lambda row : combine_set (row .get (f'{ col } _x' ), row .get (f'{ col } _y' )), axis = 1 )
143- elif operations .get (col ) == 'combine_json' :
144- df [col ] = df .apply (lambda row : combine_json (row .get (f'{ col } _x' ), row .get (f'{ col } _y' )), axis = 1 )
145- elif operations .get (col ) == 'combine_json_values' :
146- df [col ] = df .apply (lambda row : combine_json_values (row .get (f'{ col } _x' ), row .get (f'{ col } _y' )), axis = 1 )
147- else :
148- df [col ] = df [[f'{ col } _x' , f'{ col } _y' ]].sum (axis = 1 )
149- del df [f'{ col } _x' ]
150- del df [f'{ col } _y' ]
151- return df
152-
153- def empty (self ):
154- return pd .DataFrame (columns = self .unique_index_columns () + self .data_columns ())
155-
156- # Multipart collection, merge the dataframes and sum counts
157- def merge (self , rollup , new_group ):
158- if rollup is None :
159- return new_group
160-
161- rollup = pd .merge (rollup .loc [:,], new_group .loc [:,], on = self .unique_index_columns (), how = 'outer' )
162- rollup = self .summarize_merged_dataframes (rollup , self .data_columns (), operations = self .operations ())
163- return self .cast_dataframe (rollup , self .cast_types ())
164-
165- def dedup (self , dataframe , hostname_mapping = None ):
166- if dataframe is None or dataframe .empty :
167- return self .empty ()
168-
169- if not hostname_mapping :
170- return dataframe
171-
172- # map hostnames to canonical value
173- df = dataframe .copy ()
174-
175- df ['host_name' ] = df ['host_name' ].map (hostname_mapping ).fillna (df ['host_name' ])
176-
177- # multiple rows can now have the same hostname, regroup
178- df_grouped = self .regroup (df )
179-
180- # cast types to match the table
181- df_grouped = self .cast_dataframe (df_grouped , self .cast_types ())
182- return df_grouped .reset_index ()
183-
184- @staticmethod
185- def unique_index_columns ():
186- pass
187-
188- @staticmethod
189- def data_columns ():
190- pass
191-
192- @staticmethod
193- def cast_types ():
194- pass
195-
196- @staticmethod
197- def operations ():
198- pass
0 commit comments