Lynn Langit

Oct 4, 2018

9 min read

Scaling Custom Machine Learning on AWS — Part 2 EMR

Scaling VariantSpark

This is part two of a multi-part series, which is designed to detail work with the team at CSIRO Bioinformatics on scaling their custom machine learning library, VariantSpark on the AWS cloud.

VariantSpark is built in Scala on Apache Spark core. VariantSpark implements a custom machine learning (RandomForest) algorithm, which includes splitting extremely wide input data for analysis. Although it’s been built for use with genomic data, Variant Spark could also be used with other types of wide input data. This input data can include up to 100 Million features. You may wish to read part one of this blog series, if you are new to VariantSpark.

At this point in the project we decided to test a number of AWS services for suitability for scaling VariantSpark jobs. These services provide compute resources in one of two ways — either by providing and managing virtual machines or docker containers. AWS Services we considered are listed below:

  • EKS (Elastic Kubernetes Service) — AWS service for orchestrating docker containers using Kubernetes, all running on managed EC2 VMs.
  • Sagemaker — AWS service for managed docker containers for machine learning, again, running on managed clusters of EC2 instances.

Goal 1: Select One — VMs or Containers

We decided to start by working with virtual machines, rather than containers. To that end, we chose AWS EMR or Elastic Map Reduce as it provides a managed Hadoop/Spark environment on a cluster of EC2 instances.

These managed VMs more closely resemble the team’s current on-premises execution environment, so rather than using containers we felt that this was the most logical place to begin our testing.

We preferred using managed Hadoop/Spark clusters, rather than raw EC2 VMs. This choice was based on the amount of dedicated DevOps resources on the CSIRO team at the start of this project.

As a preparatory step, the team ran the the largest workload (‘Huge 2’ — see below) on the on-premises Hadoop/Spark cluster so that we would have a current baseline to compare against. We used a VariantSpark 2.x jar file which was compiled against Apache Spark 2.2 for this phase of our testing.

Before we examine the details of our AWS EMR VariantSpark testing, let’s review the sample input data that we used for this purpose.

Understanding Example Input Data

Genomic data is presented in several different data types. VariantSpark is designed to work with a variety of these — supporting .csv, .vcf, .bz2 or parquet. We used the most optimized type (parquet) for our scaling tests. Although we tested with a variety of datasets, we’ll focus on the two largest synthetic datasets (shown below Huge1 and Huge2) in this blog post. For each job run there are two types of files used — an input file and a label file. Also our input parquet files are partitioned into files of ~ 1 GB each.

Job Types — # rows (samples) * # features (columns) → Total file size

  1. Huge 2 — big GWAS2–10k samples * 50M features → 221 GB

Goal 2: Setup EMR for Testing

To get started, we designed a core set of cluster setup instructions for VariantSpark-on-EMR. Here are the instructions — link. You’ll note that VariantSpark includes a client tool ‘vs-emr’ which augments the AWS cli for reproducible, scripted installation.

Shown Below is an VS-EMR architecture diagram. You’ll note that key components of our setup are the EMR cluster itself, as well as a number of S3 buckets. The S3 buckets hold both the source (data for analysis) as well as cluster configuration files (bootstrapper files, etc..).

Also we included the Jupyter notebook service because we find using notebooks preferable to using the command line for our scaling experiments.

VariantSpark on AWS EMR configuration

Of note is that we attempted to use AWS CloudFormation to automate test runs of EMR clusters. We ultimately chose not to continue using CloudFormation, as it did not provide us with value at this point in our testing.

Goal 3: Plan Configuration & Parameters

Next we were working to make it fast and easy to test VS-EMR at scale. We had prepared our sample data and bootstrap scripts. However we also had to work on a significant challenge — determining test configuration(s).

For VS-EMR, there are FIVE Categories of config settings.

Configuration setting types for VS-EMR

To start, we worked to compare potential sets of configurations to the job runs from the on-premises cluster. To get an understanding of the complexity of this phase of our work, we’ve shown some sample configuration screens below. To start, below is an example EMR cluster configuration, shown as output from the ‘aws cli’ tool:

EMR cluster parameters

Next is an example of the EMR cluster Apache Spark configuration:

EMR Spark configuration

Finally, here’s an example VariantSpark job command from spark-submit:

VariantSpark on EMR job submission parameters

Understanding VS Job Run Steps

As we worked to create appropriate testing configurations, we considered the work steps involved in a VariantSpark job run. VariantSpark processes an input file and a label file and outputs the significant genomic variants. To do this, VariantSpark uses various types of computational processes. These processes require differing amounts of CPU, RAM and IO.

To learn about the resource needs for VariantSpark job phases, we used the Spark console output (integrated into EMR). There are 4 main stages to a VariantSpark job analysis run. Each of these stages performs an activity and each requires different types and amounts of cluster resources. The stages are as follows:

  • Zip with Index — prepares data, runs within minutes when the cluster is sized correctly, requires appropriate compute, IO and memory.
  • Count at Importance — loads data, runs within 10 minutes when the cluster is sized correctly, requires enough executor memory to fit all data. IMPORTANT: if cluster memory isn’t sufficient, then data will ‘spill’ to disk and this process runs 30–50x slower.
  • Run Decision Trees — analyzes data using multiple steps for each tree, variable run time (minutes to hours), depends on (configured) number of trees, requires appropriate compute and memory. The number of Decision Trees impacts the quality of the analysis results. For our initial testing we started by running a very small number of trees (100), we later expanded our testing to include analysis which produced as many as 10,000 trees. Properly accounting for tree building needs is a key aspect of scaling VariantSpark.

Shown below is a VariantSpark job running in the Spark console. You can see the four types of job steps in the Spark console — Executors, Zip, Count and Decision Trees. You’ll note that the majority of the processing time is spent in ‘tree building’ (timestamps of 13:22–13:32 below).

Spark console output for a VariantSpark job run

Goal 4: Monitor for Effective Sizing

We worked with a number of tools and interfaces to understand the resources needed for each VariantSpark analysis job. These tools allowed us to ‘see into’ the job executions, so that we could understand how to configure and scale the AWS EMR resources. The tools that we used included the following:

  • Ganglia Console — live cluster resource allocation and usage
  • Spark Console — live job steps, executor resource usage and logs
EMR Server Load

Shown to the left and below are sample outputs from the included Ganglia monitoring tools for EMR.

We used these tools to understand whether or not that we had appropriately sized and configured our EMR cluster. The first image shows server load distribution and utilization.

The second image shows how the resources on the cluster node workers are being used by the running VariantSpark job.

EMR worker load visualization using Ganglia

During testing we tried a number of EC2 instance types. We got the best results with the following :

  • Instance size — r4.16xlarge — reasons: competitive per core price (64 CPU) & large amount of RAM (400GB)

The figures below show the performance measured in the time to build 100 trees [left graph] or the number of trees built per hour [right graph] for the ‘HUGE1’ dataset. The figures compare the number of cores needed for both the on-premises (blue-c16…) and the AWS clusters (green-r4…)

Comparing ‘tree building’ on-premises (blue) vs AWS (green) Spark cluster performance

“The graphs above show that with the increase in the number of cores utilized we got a 5x increase in the number of trees built per hour and a similar order decrease in tree creation time (for a baseline of 100 trees).”

Success! To summarize, we found that for larger datasets 5x performance improvement (for AWS EMR vs. on-premises cluster) can be achieved due to the efficiencies in the key decision tree building phase of each VariantSpark job analysis.

Goal 5: Minimize Service Costs

Now that we were able to successfully run the largest synthetic workload using a repeatable process (EMR Cluster cloning) within a reasonable amount of time, we wanted to next understand how best minimize AWS service costs.

Of course, a true cost comparison includes MORE than service costs. You may recall from the first blog post in this series, that one key goal of this project was to speed up the feedback loop for data analysis. Below is a comparison of ‘true cost’ for VariantSpark running on both environments.

Understand EMR Compute Pricing

We wanted to understand how best to purchase EMR services. We needed to consider a number of factors — these included quantity, type and size of EMR EC2 instances. Also we needed to optimize EMR pricing by selecting the best-fit type for our need. We considered the following pricing options:

  • reserved — we eliminated this option as we didn’t have regular usage needs at this point in time
  • spot — we quickly moved to this type of pricing and saved a sizable amount on service costs
  • spot instance fleet — we tried out this option and found it to be useful in early stages of testing. This is also called spot fleet.

We learned in this phase of our work, that using EMR spot and spot fleets resulted in significant cost savings. We also learned that EC2 resources will be allocated based on availability. We observed that resources tend to be allocated more quickly when we requested smaller-sized instances, i.e. r4.large vs r4.4xlarge, etc…

TIP: As mentioned, we found great value using the ‘CLONE’ button in the EMR console. After we created a cluster/Spark/VariantSpark job run configuration, we simply ‘cloned’ those settings to quickly request a new AWS EMR spot fleet to perform another test. Also, we really like the flexibility of being able to use the AWS console to update any parameter values from the cloned set BEFORE we ran the next job instance.

Note: This work was done with CSIRO Bioinformatics / Data61 team members Dr. Denis Bauer, Arash Bayat and Piotr Szul.

Next Steps: Testing Containers and Automation

Now that we completed one type of successful scaling of VariantSpark on the AWS cloud, we were eager to compare container-based services with what we had built on EMR.

In the next installment of this series, we’ll share our experiences when we tested how best to scale VariantSpark on Kubernetes on AWS via EKS.