Skip to content

Commit 96fbc5f

Browse files
added support to run in AWS EMR
1 parent 767abd9 commit 96fbc5f

5 files changed

+110
-0
lines changed

aws-emr/build-test-data-emr.sh

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
aws emr create-cluster --name "Pyspark Benchmark - Generate Data" \
6+
--release-label emr-5.29.0 \
7+
--applications Name=Spark \
8+
--log-uri s3://your-s3-bucket/logs/ \
9+
--ec2-attributes KeyName=your-key-pair \
10+
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.xlarge,BidPrice=OnDemandPrice InstanceGroupType=CORE,InstanceCount=6,InstanceType=r5d.2xlarge,BidPrice=OnDemandPrice \
11+
--bootstrap-actions Path=s3://your-s3-bucket/emr_bootstrap.sh \
12+
--steps Type=Spark,Name="Pyspark Benchmark - Generate Data",\
13+
ActionOnFailure=CONTINUE,\
14+
Args=[--deploy-mode,cluster,--master,yarn,s3://your-s3-bucket/jobs/generate-data.py,s3://your-s3-bucket/data/,-r,2000000000,-p,1000] \
15+
--use-default-roles \
16+
--auto-terminate
17+

aws-emr/emr_bootstrap.sh

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/bash
2+
sudo pip install -U \
3+
matplotlib \
4+
pandas

aws-emr/run-benchmarks-emr.sh

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
aws emr create-cluster --name "Pyspark Benchmark - Shuffle" \
6+
--release-label emr-5.29.0 \
7+
--applications Name=Spark \
8+
--log-uri s3://your-s3-bucket/logs/ \
9+
--ec2-attributes KeyName=your-key-pair \
10+
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.xlarge,BidPrice=OnDemandPrice InstanceGroupType=CORE,InstanceCount=6,InstanceType=r5d.2xlarge,BidPrice=OnDemandPrice \
11+
--bootstrap-actions Path=s3://your-s3-bucket/emr_bootstrap.sh \
12+
--steps Type=Spark,Name="Pyspark Benchmark - Shuffle",\
13+
ActionOnFailure=CONTINUE,\
14+
Args=[--deploy-mode,cluster,--master,yarn,s3://your-s3-bucket/jobs/benchmark-shuffle.py,s3://your-s3-bucket/data/,-r,250,-n,'pyspark-benchmark-shuffle',-o,s3://your-s3-bucket/results/pyspark-shuffle] \
15+
--use-default-roles \
16+
--auto-terminate
17+
18+
19+
aws emr create-cluster --name "Pyspark Benchmark - CPU" \
20+
--release-label emr-5.29.0 \
21+
--applications Name=Spark \
22+
--log-uri s3://your-s3-bucket/logs/ \
23+
--ec2-attributes KeyName=your-key-pair \
24+
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5d.xlarge,BidPrice=OnDemandPrice InstanceGroupType=CORE,InstanceCount=6,InstanceType=r5d.2xlarge,BidPrice=OnDemandPrice \
25+
--bootstrap-actions Path=s3://your-s3-bucket/emr_bootstrap.sh \
26+
--steps Type=Spark,Name="Pyspark Benchmark - CPU",\
27+
ActionOnFailure=CONTINUE,\
28+
Args=[--deploy-mode,cluster,--master,yarn,s3://your-s3-bucket/jobs/benchmark-cpu.py,s3://your-s3-bucket/data/,-s,25000000000,-p,1000,-n,'pyspark-benchmark-cpu',-o,s3://your-s3-bucket/results/pyspark-cpu] \
29+
--use-default-roles \
30+
--auto-terminate

benchmark-cpu.py

+29
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ def parseArguments():
6363
dest='appName',
6464
help='The name given this PySpark job'
6565
)
66+
arguments.add_argument(
67+
'-o', '--results-output',
68+
metavar='results-file-path',
69+
type=str,
70+
default=None,
71+
dest='results_output_file',
72+
help='The file path to place the results output'
73+
)
6674
return arguments.parse_args()
6775

6876
def benchmarkSHA256(df, jobLogger):
@@ -187,6 +195,27 @@ def main():
187195
joblogger.info('')
188196
joblogger.info('****************************************************************************')
189197

198+
if args.results_output_file is not None:
199+
joblogger.info('')
200+
joblogger.info('Writing results to {0}'.format(args.results_output_file))
201+
202+
results_list = [
203+
('sha-512',sha256_time),
204+
('calc-pi-python-udf',calcPi_time),
205+
('calc-pi-dataframe',calcPi_DF_time),
206+
]
207+
208+
results_schema = T.StructType([
209+
T.StructField("test", T.StringType()),
210+
T.StructField("seconds", T.DoubleType())
211+
])
212+
results_df = spark.createDataFrame(results_list, schema=results_schema).coalesce(1)
213+
results_df.write.csv(
214+
args.results_output_file,
215+
header=True,
216+
mode='overwrite'
217+
)
218+
190219

191220
if __name__ == '__main__':
192221
main()

benchmark-shuffle.py

+30
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ def parseArguments():
5454
dest='repartitions',
5555
help='The number of partitions to use in repartition benchmark'
5656
)
57+
arguments.add_argument(
58+
'-o', '--results-output',
59+
metavar='results-file-path',
60+
type=str,
61+
default=None,
62+
dest='results_output_file',
63+
help='The file path to place the results output'
64+
)
5765
return arguments.parse_args()
5866

5967
def benchmarkGroupBy(df, jobLogger):
@@ -221,5 +229,27 @@ def main():
221229
joblogger.info('')
222230
joblogger.info('**********************************************************************')
223231

232+
if args.results_output_file is not None:
233+
joblogger.info('')
234+
joblogger.info('Writing results to {0}'.format(args.results_output_file))
235+
236+
results_list = [
237+
('group-by',groupBy_time),
238+
('repartition',repartition_time),
239+
('inner-join',innerJoin_time),
240+
('broadcast-inner-join',broadcastInnerJoin_time),
241+
]
242+
243+
results_schema = T.StructType([
244+
T.StructField("test", T.StringType()),
245+
T.StructField("seconds", T.DoubleType())
246+
])
247+
results_df = spark.createDataFrame(results_list, schema=results_schema).coalesce(1)
248+
results_df.write.csv(
249+
args.results_output_file,
250+
header=True,
251+
mode='overwrite'
252+
)
253+
224254
if __name__ == '__main__':
225255
main()

0 commit comments

Comments
 (0)