-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSCD_06.py
148 lines (93 loc) · 4.47 KB
/
SCD_06.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Databricks notebook source
# MAGIC %md
# MAGIC ####Python Notebook for SCD_06 Implementation in PySpark
# COMMAND ----------
# MAGIC %md
# MAGIC #####Import Libraries
# COMMAND ----------
from pyspark.sql.functions import when,lit,array_contains,col
from pyspark.sql import SparkSession, SQLContext
import datetime
import pandas as pd
# COMMAND ----------
# MAGIC %md
# MAGIC #####Spark Session
# COMMAND ----------
spark = SparkSession.builder.appName('SCD_06').getOrCreate()
spark
# COMMAND ----------
# MAGIC %md
# MAGIC #####Reading Stores Initial Data Files into DataFrame
# COMMAND ----------
# DataFrame
stores_df = spark.read.csv('/FileStore/tables/stores_1.csv', header=True, inferSchema=True)
display(stores_df)
# COMMAND ----------
stores_df.printSchema()
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Renaming Columns
# COMMAND ----------
stores_df = stores_df.withColumnRenamed("store_city","current_city").withColumnRenamed("store_country","current_country")
display(stores_df)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Implemeting SCD_06 Required Columns in DataFrame
# COMMAND ----------
stores_df = stores_df.withColumn("previous_city",stores_df["current_city"]).withColumn("previous_country",stores_df["current_country"]).withColumn("start_date",lit(datetime.datetime(2022, 4, 10).strftime('%Y/%m/%d'))).withColumn("end_date",lit(datetime.datetime(9999, 12, 31).strftime('%Y/%m/%d'))).withColumn("current_flag",lit('Y'))
#Rearrange Columns
stores_df = stores_df.select("store_id", "store_name", "current_city","previous_city", "current_country", "previous_country", "start_date", "end_date", "current_flag")
display(stores_df)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Saving DataFrame to a Hive Table
# COMMAND ----------
#Save DataFrame to a Temp Table/View
stores_df.createOrReplaceTempView('stores_tmp')
#Initializing SQL Context
sql_ctx = SQLContext(spark)
#Truncating Table Directory
dbutils.fs.rm("dbfs:/user/hive/warehouse/", True)
#Drop Table if Exists
sql_ctx.sql("DROP TABLE IF EXISTS Stores_scd")
sql_ctx.sql("CREATE TABLE Stores_scd AS SELECT * from stores_tmp")
# COMMAND ----------
#Reading Data from Table
scd_df = spark.read.table("Stores_scd")
display(scd_df)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Reading Incremented Stores Data into DataFrame
# COMMAND ----------
incremented_df = spark.read.csv('/FileStore/tables/stores_2.csv', header=True, inferSchema=True)
display(incremented_df)
# COMMAND ----------
# MAGIC %md
# MAGIC #####Implementing SCD_06 Calculations.
# COMMAND ----------
#Read Store_ids of Initials Stores Data
stores_list = list(scd_df.toPandas()['store_id'])
for row in incremented_df.collect():
#Update the Old Rows and Mark Current_Flag as FALSE
if row['store_id'] in stores_list:
scd_df = scd_df.withColumn("current_city", when((scd_df["store_id"] == row["store_id"]) & (scd_df["store_name"] == row["store_name"]), row["store_city"]).otherwise(scd_df["current_city"]))
scd_df = scd_df.withColumn("current_country", when((scd_df["store_id"] == row["store_id"]) & (scd_df["store_name"] == row["store_name"]), row["store_country"]).otherwise(scd_df["current_country"]))
scd_df = scd_df.withColumn("end_date", when((scd_df["store_id"] == row["store_id"]) & (scd_df["store_name"] == row["store_name"]), lit(datetime.datetime(2022, 4, 12).strftime('%Y/%m/%d'))).otherwise(scd_df["end_date"]))
scd_df = scd_df.withColumn("current_flag", when((scd_df["store_id"] == row["store_id"]) & (scd_df["store_name"] == row["store_name"]), lit("N")).otherwise(scd_df["current_flag"]))
#Insert New Row and Mark Current_Flag as True
new_row = [(row["store_id"],row["store_name"],row["store_city"],row["store_city"],row["store_country"],row["store_country"],datetime.datetime(2022, 4, 12).strftime('%Y/%m/%d'),datetime.datetime(9999, 12, 31).strftime('%Y/%m/%d'), 'Y')]
columns = ["store_id", "store_name", "current_city","previous_city", "current_country", "previous_country", "start_date", "end_date", "current_flag"]
newdf = spark.createDataFrame(new_row, columns)
scd_df = scd_df.union(newdf)
# COMMAND ----------
# MAGIC %md
# MAGIC #####Sort and save DataFrame
# COMMAND ----------
scd_df = scd_df.sort(col("store_id"), col("current_flag").asc())
display(scd_df)
# COMMAND ----------
# MAGIC %md
# MAGIC ##### Save Resules to SCD Table
# COMMAND ----------
scd_df.write.mode("overwrite").saveAsTable("Stores_scd")
display(spark.read.table("Stores_scd"))