The blog is posted by WeCloudData’s Big Data course student Abhilash Mohapatra.
This story represents an easy path to Transform Data using PySpark. Along with Transformation, Spark Memory Management is also taken care.
Here Freddie-Mac Acquisition and Performance Data from year 1999–2018 is used to create a Single o/p file which can further be used for Data Analysis or Building Machine Learning Models. Lastly a comparison is performed between MapReduce and Spark.
- Service — Data Bricks, EMR.
- Storage — PC, S3.
- Language — PySpark, Python, SAS(Reference).
- Understanding Data and Transformation Logic.
- Spark Parallelism and Job Life Cycle.
- Spark Memory Management.
- Data Processing with Spark-Submit.
- Data Validation.
- Comparison between MapReduce and Spark.
Two I/p files used are of size 145 Gb and o/p file is of 5 Gb for Full Volume. With Sample Files, total I/p and O/p file size is nearly 3Gb. O/p file is similar to Acquisition file, but with more columns like Loan Default Status, Default UPB, etc. from Performance Data.
1. Understanding Data and Transformation Logic.
Data is collected from Freddie-Mac website using a simple python script. It Scrapped website, Downloaded and Unzipped file into local PC. Unzipped files were then uploaded to S3 for further processing.
Acquisition File is having 27 attributes and Performance with 26 attributes. Please follow the Copy-Book link for more details. Transformation logic is extracted from SAS code provided in Freddie Mac website and decoded into PySpark. A total of 34 new attributes are extracted from Performance data using Window , Lag and UDF’s making total of 61 attributes in o/p file.
All input data collected from Freddie-Mac Web, present in S3 is of String Type. Hence Type Casting is performed accordingly in PySpark Script before actual processing.
8 new intermediate files were created from Performance Data by various transformation logics. At last, all the 8 new files are joined with Acquisition data to create the final o/p with 61 attributes.
Used DataBricks for initial PySpark code development with Sample data(3GB) and finally executed the script in EMR with Full(145 GB)data.
In DataBricks community edition(1 EC2 instance with 8Vcores 6Gb),it took around 2.16 Hrs to process 3GB of Sample data.
AWS EMR(1 Master +10 Core nodes each with 16Vcores 30Gb) took 19 Mins to Process Sample Data and 2.6 Hrs for Full Data While Persisting and 4.9 Hrs without Persisting Intermediate Files.
Git Link for all the necessary SAS, DataBricks, Python & PySpark Codes.
2. Spark Parallelism and Job Life Cycle.
When a normal python job is submitted, the programme executes in a Single Vcore though others Vcores are available. When a similar python programme but with Scikit-learn Grid Search CV is used with njobs=-1 , all the available cores are used by the single job to execute different search jobs parallelly.
The below image depicts similar execution. Windows Processor 4 is showing a Simple Python script and 5 a Grid Search both executing at same time.
When a Spark Job is Submitted in Cluster, All the Available Nodes and All the Vcores in each Node work parallelly to perform certain task. The below diagram represents a spark flow to read a csv file, perform transformation and save data to disk.
When we submit a Spark Job, The Job is divided into different Stages. Each Stage is further divided into Tasks. At a certain Time, An Executor Performs Certain Number Of Tasks with Maximum Tasks= No. Of Executor Cores depending on Task and Executor memory size.
From the above diagram, its also clear that data is never touched to Driver/Master Node. Data is Pulled from source, Processed by Executors and Delivered to Destination.
3. Spark Memory Management.
Default Spark Memory Distribution is as shown in below diagram. At any time, Total memory requested to YARN by Spark will be Executors Memory + Overhead Memory. Hence 10% overhead memory should always be kept in mind when modifying spark.executor.memory. To have control over overhead memory, parameter spark.executor.memoryOverhead can be used.
Here an EMR cluster with 1 Master Node and 10 Worker Nodes is to be started. Total Available Memory/Cores Distribution in Cluster and Spark is shown below.
As data is only to be transformed here, spark.storage.memoryFraction=0.1 & spark.shuffle.memoryFraction=0.9 is used. The below diagram depicts Spark Distribution with different number of Executor Cores. Total available memory is used interchangeably by Spark storage and shuffle whenever necessary. The above sets the limit when to evict any data to disk if there is shortage of memory.
Another way to have control over Memory is the number of Partitions that our data is divided into for parallel processing by Spark called RDD’s. This is because, at certain time, a Spark Executor performs a certain Task on certain RDD/Partition. Hence each partition should carry data according to Executors Memory to avoid errors.
Data partitions is controlled by parameters like spark.default.parallelism, spark.sql.shuffle.partition, and repartiton()/coalesce(). spark.default.parallelism represents the number of RDD partitions to be used while reading data from input source, while spark.sql.shuffle.partition represents the number of RDD partitions to be used while shuffling data.
Through out the transformations for above job, whenever data is read from Input(S3), Number of Tasks = Number of Partitions = Set According to Default spark.default.parallelism, which varies according to data size. Rest all of the time, Number of Tasks = Number of Partitions = spark.sql.shuffle.partition set in spark-submit.
Through out the transformation for above job, Whenever data is read from Input(S3), Number of Tasks = Number of Partitions = Set According to spark.default.parallelism set in spark-submit for Large File, but reshaped by spark for Smaller File. Rest all of the time, Number of Tasks = Number of Partitions = spark.sql.shuffle.partition set in spark-submit.
Data serialization is the process of converting data objects present in complex data structures into a byte stream for storage, transfer and distribution. Data Serialization plays an important role in the performance of any distributed application. This can be controlled by parameter spark.serializer. As suggested by Apache, Serializer “org.apache.spark.serializer.KryoSerializer” is faster than default Java Serializer.
Intermediate Data can be cached to RAM or Disk using persist()/cache(). By using persist we can have Control over where the data is cached like Disk, RAM or Both. Testing is performed once with persisting intermediate data Only to Disk and other without. The time taken to process the whole data was nearly doubled when intermediate files were not persisted.
While reading/writing data to database, repartiton()/coalesce() is performed if data size is huge. This is to avoid large number of connections to database at the same time.
Spark Tries Multiple Times after any task failure before giving up on the job. This can be set by spark.task.maxFailures. Default is 4. Spark was able to connect to S3 within 4 tries, else the whole job would have failed.
4. Data Processing.
While processing Sample Data of size nearly 3Gb in DataBricks & EMR, default settings worked well. But when processing all 145Gb of Data with a certain AWS EMR Cluster Size, above parameters were tweaked for successful Job execution.
spark.dynamicAllocation.enabled is set to True by AWS in default which initializes a number of parameters to default. spark.sql.shuffle.partition & spark.default.parallelism = 3200 after memory failures with 400/800. spark.shuffle.memoryFraction = 0.9 & spark.storage.memoryFraction = 0.1 were set, as here only Transformation is performed. As suggested by spark, spark.serialize = “org.apache.spark.serializer.KryoSerializer” being faster as suggested by Apache.
The intermediate files created while processing full volume is 81Gb, So intermediate DataFrames were persisted to Disk-Only. This saves 50% of processing time. coalesce(20) while writing output to restrict large number of connections to S3 at the same time. These two changes were performed in PySpark script.
With this cluster size, In AWS by default spark.dynamicAllocation.enabled is set to True. With this setup spark.executor.memory is set to 4743M = 4.6g.
With this cluster, In AWS when maximizeResourceAllocation is set to True, spark.executor.memory is set to 20131M = 19.6g.
Hence in default mode, if we only manually change executor cores, there is no change in executor memory allocation leading to less number of executors though full memory is used , resulting in less parallelism. Hence when using in dynamicAllocation or maximizeResourceAllocation mode, if configuration parameters are modified, need to be done very carefully.
Test-1: spark.dynamicAllocation.enabled=true, spark.executor.cores=4
Test-2: maximizeResourceAllocation=true, spark.executor.cores=16
The below diagram represents Executor State at any point of time. As there are 16 cores per executor, at any time, Maximum tasks an Executor can perform =16. Here 10 tasks were performed while image was captured. Number of Tasks depends on Allocated Executor Cores, Memory and Task Size.
In AWS for this cluster, Once maximizeResourceAllocation is set to true, spark.executor.memory is also automatically set to 20131M = 19.6g. Then, If spark.executor.cores is modified without modifying spark.executor.memory, maximum one executor is allocated per node due to memory constraint(19.6g) irrespective of number of cores assigned per executor.
In Final Run for maximum parallelization, started a Fresh Cluster and used spark-submit with same configuration as Test Case — 1.
The transformation job completed successfully after 2.5 hrs. Total no of files written to S3 were 20 as set by coalesce in script with record count exactly the same as of Acquisition File. There is still definitely many scope of optimization both in PyScript and Spark Configuration.
5. Data Validation.
Total 20 output files were created, as coalesce is used just before writing to S3.
Acquisition File and Total Loan Count.
Processed File and Total Loan Count.
While reading processed file, header is taken care as each of the 20 files is associated with a header record.
6. Comparison between MapReduce and Spark.
Same data was processed using Hive, which in turn uses MapReduce for operations. While pre-processing in Hive only ordering was performed based on loan-id and period.
It took around 3.5 hrs to Order and Load only performance data to Hive.
Link for Data Pipeline with Hive in EMR. Here same Freddie-Mac data is processed but with only Date Transformation.
Here in Spark, though we created a larger cluster than before, but also heavy lifting is performed. Operations like Windowing, Lagging, Mapping and Joins with 9 different intermediate tables are performed in Spark with only 2.5 hrs even with much larger records.
The main reason for difference in performance is that MapReduce writes intermediate data to Disk during operations, while Spark uses Memory(RAM). Writing to disk is definitely a costly operation.
One more observation, about Persist in Spark. Here while processing in Spark, nearly 50% of time is saved by persisting to Disk only large intermediate files. Persist is used due to limitations of personal cluster RAM size. But in MapReduce both large and smaller files are written to Disk making it slower.
This marks the end of processing all the Two Decades Freddie-Mac Single Family Loan Data using PySpark with AWS.
This o/p processed file can further be used for Single Family Home Loan Data Analysis and Building Loan Risk Model using Machine Learning.
*Except total-loss-incurred, tried to put all other transformations from SAS.
*Spark Default Settings should be set carefully while working in shared cluster or with single application for maximum resource utilization.
*Here as the data size is know, so memory calculation was performed with accuracy along with partitions. When input data size is not available and is very large, maximizeResourceAllocation or dynamicAllocation can be used depending on environment along with broadcasting the joins forcefully.
*AWS clusters will incur charges, But DataBricks Community Edition can be used to work with sample files for Free.
Git Link for all the necessary SAS, DataBricks, Python & PySpark Codes.
Thanks For Reading. Questions and Suggestions are welcome.
To find out more about the courses our students have taken to complete these projects and what you can learn from WeCloudData, view the learning path. To read more posts from Abhi, check out his Medium posts here.