1- from pyspark .sql .functions import col , lit , concat_ws
1+ from pyspark .sql .functions import (
2+ col ,
3+ concat_ws ,
4+ lit ,
5+ )
26
37from datacustomcode .client import Client
48from datacustomcode .io .writer .base import WriteMode
@@ -8,45 +12,55 @@ def main():
812 client = Client ()
913
1014 employees = client .read_dlo ("Employee__dll" ).persist ()
11- employees = employees .select (
12- "id__c" ,
13- "manager_id__c" ,
14- "name__c" ,
15- "position__c"
16- )
15+ employees = employees .select ("id__c" , "manager_id__c" , "name__c" , "position__c" )
1716 employees .show ()
18- employees_with_manager = employees .alias ("e" ) \
19- .join (employees .alias ("m" ), col ("e.manager_id__c" ).cast ("string" ) == col ("m.id__c" ).cast ("string" ), "left" ) \
17+ employees_with_manager = (
18+ employees .alias ("e" )
19+ .join (
20+ employees .alias ("m" ),
21+ col ("e.manager_id__c" ).cast ("string" ) == col ("m.id__c" ).cast ("string" ),
22+ "left" ,
23+ )
2024 .select (
2125 col ("e.id__c" ),
2226 col ("e.name__c" ),
2327 col ("e.position__c" ),
2428 col ("e.manager_id__c" ),
25- col ("m.name__c" ).alias ("manager_name__c" )
26- ).persist ()
29+ col ("m.name__c" ).alias ("manager_name__c" ),
30+ )
31+ .persist ()
32+ )
2733
28- hierarchy_df = employees_with_manager .filter (col ("manager_id__c" ).isNull ()) \
29- .withColumn ("hierarchy_level__c" , lit (1 )) \
30- .withColumn ("management_chain__c" , col ("name__c" )).persist ()
34+ hierarchy_df = (
35+ employees_with_manager .filter (col ("manager_id__c" ).isNull ())
36+ .withColumn ("hierarchy_level__c" , lit (1 ))
37+ .withColumn ("management_chain__c" , col ("name__c" ))
38+ .persist ()
39+ )
3140
3241 current_level = 1
3342
3443 while True :
3544 ewm = employees_with_manager .alias ("ewm" )
36- hdf = hierarchy_df .filter (col ("hierarchy_level__c" ) == current_level ).alias ("hdf" )
45+ hdf = hierarchy_df .filter (col ("hierarchy_level__c" ) == current_level ).alias (
46+ "hdf"
47+ )
3748
38- next_level_df = (ewm
39- .join (hdf , col ("ewm.manager_id__c" ).cast ("string" ) == col ("hdf.id__c" ).cast ("string" ) , "inner" )
40- .select (
41- col ("ewm.id__c" ),
42- col ("ewm.name__c" ),
43- col ("ewm.position__c" ),
44- col ("ewm.manager_id__c" ),
45- col ("ewm.manager_name__c" ),
46- (col ("hdf.hierarchy_level__c" ) + 1 ).alias ("hierarchy_level__c" ),
47- concat_ws (" | " , col ("hdf.management_chain__c" ), col ("ewm.name__c" )).alias ("management_chain__c" )
48- )
49- )
49+ next_level_df = ewm .join (
50+ hdf ,
51+ col ("ewm.manager_id__c" ).cast ("string" ) == col ("hdf.id__c" ).cast ("string" ),
52+ "inner" ,
53+ ).select (
54+ col ("ewm.id__c" ),
55+ col ("ewm.name__c" ),
56+ col ("ewm.position__c" ),
57+ col ("ewm.manager_id__c" ),
58+ col ("ewm.manager_name__c" ),
59+ (col ("hdf.hierarchy_level__c" ) + 1 ).alias ("hierarchy_level__c" ),
60+ concat_ws (" | " , col ("hdf.management_chain__c" ), col ("ewm.name__c" )).alias (
61+ "management_chain__c"
62+ ),
63+ )
5064
5165 if next_level_df .isEmpty ():
5266 break
0 commit comments