Skip to content

Commit d190722

Browse files
added progs for chapter 5
1 parent 7c6e27d commit d190722

File tree

2 files changed

+108
-0
lines changed

2 files changed

+108
-0
lines changed

Diff for: code/chap05/python/customers_with_date.txt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
customer_id,date,transaction_id,item,transaction_value
2+
c1,2/9/2019,T0011,20.67
3+
c1,2/9/2019,T0012,12.34
4+
c1,3/16/2019,T0013,30.67
5+
c1,3/16/2019,T0014,42.34
6+
c1,4/24/2019,T0023,48.30
7+
c1,4/24/2018,T0051,28.67
8+
c1,4/25/2019,T0043,42.30
9+
c1,4/25/2018,T0091,29.67
10+
c1,1/3/2018,T0002,12.34
11+
c1,4/30/2018,T0003,44.30
12+
c2,2/8/2019,T0511,20.67
13+
c2,2/8/2019,T0612,12.34
14+
c2,2/9/2019,T0061,20.67
15+
c2,2/9/2019,T0062,12.34
16+
c2,3/16/2019,T0513,30.67
17+
c2,3/16/2019,T0014,42.34
18+
c2,4/24/2019,T0023,48.30
19+
c2,4/24/2018,T0051,28.67
20+
c2,4/25/2019,T0043,42.30
21+
c2,4/25/2018,T0091,29.67
22+
c2,1/3/2018,T0002,12.34
23+
c2,4/30/2018,T0003,44.30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#!/usr/bin/env python
2+
#-----------------------------------------------------
3+
# 1. Read customer.txt
4+
# 2. Create a DataFrame with 5 columns:
5+
# {
6+
# <customer_id>,
7+
# <date>,
8+
# <transaction_id>,
9+
# <item>,
10+
# <transaction_value>
11+
# }
12+
#
13+
# <date> as day/month/year
14+
#
15+
# 3. Partition data by (<year>, <month>)
16+
#
17+
# sample input: customers_with_date.txt
18+
#
19+
#-------------------------------------------------------
20+
# @author Mahmoud Parsian
21+
#-------------------------------------------------------
22+
from __future__ import print_function
23+
import sys
24+
from pyspark.sql import SparkSession
25+
from pyspark.sql.functions import udf
26+
from pyspark.sql.types import IntegerType
27+
28+
#-------------------------------------
29+
# date_as_str: day/month/year
30+
@udf(returnType=IntegerType())
31+
def get_year(date_as_str):
32+
tokens = date_as_str.split("/")
33+
return int(tokens[2])
34+
#end-def
35+
#-------------------------------------
36+
# date_as_str: day/month/year
37+
@udf(returnType=IntegerType())
38+
def get_month(date_as_str):
39+
tokens = date_as_str.split("/")
40+
return int(tokens[1])
41+
#end-def
42+
#-------------------------------------
43+
# main program:
44+
#
45+
# define input path
46+
input_path = sys.argv[1]
47+
print("input_path=", input_path)
48+
49+
# define output path for partitioned data
50+
output_path = sys.argv[2]
51+
print("output_path=", output_path)
52+
53+
# create a SparkSession object
54+
spark = SparkSession.builder.getOrCreate()
55+
56+
57+
# create a DataFrame, note that toDF() returns a
58+
# new DataFrame with new specified column names
59+
# columns = ('customer_id', 'date', 'transaction_id', 'item', 'transaction_value')
60+
df = spark.read.option("inferSchema", "true")\
61+
.csv(input_path)\
62+
.toDF('customer_id', 'date', 'transaction_id', 'item', 'transaction_value')
63+
#
64+
df.show(truncate=False)
65+
df.printSchema()
66+
#
67+
# add 2 new columns: year and month
68+
df2 = df.withColumn('year', get_year(df.date))\
69+
.withColumn('month', get_month(df.date))
70+
#
71+
df2.show(truncate=False)
72+
df2.printSchema()
73+
#
74+
# partition data by 'year', and then by 'month'
75+
# each partition will have one or more files
76+
df2.write.partitionBy('year', 'month')\
77+
.parquet(output_path)
78+
79+
# read the partitioned data back to another DataFrame
80+
df3 = spark.read.parquet(output_path)
81+
df3.show(truncate=False)
82+
df3.printSchema()
83+
84+
# done!
85+
spark.stop()

0 commit comments

Comments
 (0)