Scaling Custom Machine Learning on AWS — Part 3 Kubernetes

This is part three of a multi-part series, which is designed to detail work with the team at CSIRO Bioinformatics on scaling their custom machine learning algorithm, VariantSpark on the AWS cloud.

VariantSpark is a machine learning library for genomic data. It’s built in Scala on Apache Spark core. VariantSpark implements a custom machine learning (RandomForest) algorithm, which includes splitting extremely wide input data for analysis. This input data can include up to 100 Million features.

  • In part one — we detailed the project itself and also preparation work on the VariantSpark library.
  • In part two — we detailed scaling VariantSpark analysis jobs on the AWS cloud using the Elastic Map Reduce or EMR service.

Goal 1: Use Machine Learning Containers

Because many people on this project had little experience with container-based workloads in general, we decided to first work with individual Docker containers for machine learning, before we moved to attempting to build an AWS container cluster for VariantSpark.

Understanding Docker

We took a two-pronged approach to our work. First, we worked with the CSIRO team to build and use a custom Docker container for bioinformatics, so that they come become familiar with these technologies via a relevant example. We built, tested, deployed and updated a custom container which included the following:

  • the common ‘blastn’ bioinformatics tool
  • the Jupyter notebook service
  • sample analysis data
  • a sample notebook

In this process we reviewed how to setup the desktop Docker tools, register a custom container on DockerHub and how to interact with a container locally. We used these tools, the docker cli and also Kitematic for local testing.

Evaluating AWS SageMaker

Second, we took a look at AWS SageMaker. SageMaker uses AWS-custom ML containers to provide scaling for common phases of the ML lifecycle. However it abstracts away much of the container orchestration processes by presenting the ML workflow phases. These phases are shown below:

AWS SageMaker ML lifecycle phases

To start working with SageMaker, we simply tried out a sample of the hosted Jupyter notebook instance feature. We really like the simplicity of setup and usability of the Jupyter Notebook feature. After this test, we were convinced that we should either leverage SageMaker’s ‘Notebook Instance’ feature or build our own container-based solution for Jupyter as part of our VariantSpark container architecture. We prefer using Jupyter notebooks (to a bash shell) for running VariantSpark analysis jobs due to Jupyter’s ability to run, document AND visualize results.

Next we ran end-to-end ML workloads using some of the publicly available SageMaker examples. We liked the separation between the ‘Jobs’ and ‘Models’ phases, as it showcased matching different sizes and quantities of compute (using containers) to the different scaling needs. We also found the ‘Endpoints’ functionality to be useful for deploying trained models to production applications.

During this phase we focused on the SageMaker-optimized implementation of the XGBoost algorithm using the example SageMaker notebook. We did for the following reasons:

  1. to understand the benefits of using AWS-optimized ML containers
  2. to see how the Jobs (training) and Models (hosting) phases worked
  3. to use an algorithm that had a similarity to VariantSpark (XGBoost uses a tree-based approach — blog which compares both algorithms)
  4. to try the hyperparameter tuning feature

We reviewed the available SageMaker Spark samples. We noticed that the only available SageMaker Spark sample used the Spark ML library and was written for Python (PySpark). You may recall that VariantSpark uses Spark core and is written in Scala. Also we noticed that PySpark sample required an EMR cluster, to pipe the data in and out of SageMaker for processing.

SageMaker can work with custom machine learning algorithms. We ran this sample as a POC to understand better what would be involved in the process of creating a custom ML container for SageMaker. We understood that to use VariantSpark with SageMaker, we would have to create one or more custom docker containers for VariantSpark, using the SageMaker guidelines. Although this idea intrigued us, around this time there was an interesting product announcement…

Spark 2.3 and Kubernetes

In March 2018, Databricks announced that Spark 2.3 could run with Kubernetes (rather than YARN) as its native job scheduler. Shown below is this architecture (from Databricks blog):

Spark 2.3 and Kubernetes

Around this time in our project, we also added a DevOps contractor to our team. We wanted to explore using Terraform templates for this portion of the project and our new contractor had experience with both Terraform and also container building and orchestration with Kubernetes and other orchestrators.

Goal 2: Select a Container Service

Now we were ready to select one or moreAWS container orchestration services for testing for scaling VariantSpark jobs using container clusters. There are a number of services to choose from on the AWS cloud. We considered the following for this project:

  • EKS (Elastic Kubernetes Service) — AWS service for managing containers using open source Kubernetes. We chose to test this service and will detail our work in the remainder of this blog post.
  • ECS (Elastic Container Service) — AWS service for managing containers using their own container-management system. We chose not to test this service, as we preferred to use Kubernetes.
  • Fargate w/ECS — AWS service for managing containers using their own higher-level container-management system. We chose not to test this service, same reason as above.
  • Fargate w/EKS — AWS service for managing containers using Kubernetes with a higher-level container-management system. At the time of work, Fargate w/EKS was not yet available, so we were unable to test this service.
  • SageMaker — AWS service for managed containers for machine learning workloads. We made an effort to test this service, but stopped to focus on Kubernetes, as SageMaker uses ECS. We preferred to work with Kubernetes.

We chose open source Kubernetes using EKS.

Starting our work with EKS, we wanted to first build a container that included Spark, Spark ML and Scala and run it locally. We found a couple of helpful blog posts to start this process. We were able to replicate this work quickly. We were particularly encouraged by being able to easily ‘containerize’ Spark and to interact with the local file system (rather than HDFS).

Build a Spark ML Kubernetes POC

For the next step, we wanted to get our custom Spark ML/Scala container working on EKS. To do this we created Terraform templates and also used the kops service in lieu of EKS (as the latter wasn’t yet publicly available for testing). The architecture we built from is is shown below.

Spark/Scala on EKS & S3

We were able to get the custom Spark/Scala SparkML job to run in the AWS EKS (kops) environment with S3 as our data store pretty quickly and easily. We posted our code sample on GitHub as ‘lynnlangit/Spark-Scala-EKS’.

Because we were interested in functionality, i.e. ‘would this work?’ at this point, we simply got the sample to work and then moved on to the next step. We did not focus on scalability yet — that would come later, when we worked with VariantSpark.

Building a VariantSpark Container

The next logical step in our work was to add the VariantSpark jar file to our Spark/Scala docker container, so that we could run VariantSpark importance analysis jobs. To do this, we first needed to create a build of VariantSpark for Spark 2.3, as this was required in order to be able to use Kubernetes (rather than Hadoop YARN) for container orchestration.

We produced a VariantSpark jar file versioned 2.3 with appropriate dependencies (on GitHub — here), then we worked to update our existing Spark/Scala container with this Kubernetes-compatible VariantSpark jar file.

The next step was local testing. We used the Docker Community Edition tools to test our single container VariantSpark job run. This test worked without issue. After that we wanted to test with Kubernetes locally. We used the Kubernetes minkube to run this test. Fortunately, this also run just fine. We used the MVC (minimum viable cluster — two pods) for this test.

At this time we were still in the ‘will this configuration actually work?’ phase of our testing. We ran sample VariantSpark jobs locally with very small amounts of data (using the ‘Tiny’ data sample — from below) so that we could get quick feedback during and after each job run attempt. We were not yet to the point of testing scaling.

Goal 3: Setup VS-EKS for Testing

Finally we were ready to test our custom VariantSpark container on EKS. During this time AWS released the Kubernetes-native EKS service to GA (general availability) which removed the need for us to use the kops service.

Below is an architecture diagram. You’ll see that key components are the EKS cluster and S3 buckets. The S3 buckets hold both the source (data for analysis) as well as Kubernetes configuration files. It’s important to understand that this configuration eliminates the need for EMR, as all data is stored in S3 or EBS.

You’ll note that we configured the Jupyter notebook service to use a persistent EBS volume and a persistent Kubernetes pod. We wanted to use a stateful storage mechanism for our preferred client notebook interface. We found this blog post helpful in understanding required configuration steps to do this for a Kubernetes cluster.

VariantSpark on AWS EKS configuration

We created Terraform templates to implement our desired AWS configuration. Here is a short screencast which shows setting up a cluster, running a job and deleting a cluster using Terraform templates.

Setting up a VariantSpark EKS cluster using Terraform templates

Here are the instructions to set client dependencies and to run these Terraform templates — Github link.

As with our earlier on-premises test of VariantSpark on Kubernetes, when we started testing on EKS, we simply ran small-sized importance analysis jobs so that we could verify that our configuration was correct.

Goal 4: Plan Configuration & Parameters

Now that we had a working sample environment, our next step was to start the work to testing scaling VariantSpark jobs. At this point, we wanted to be able to compare the resources needed for on-premise, EMR or EKS. We worked diligently to use the most similar configuration for our testing.

It’s important to note that we are testing VariantSpark EKS with Spark 2.3 (required for Kubernetes). EMR and on-premises testing used Spark 2.2. As with EMR, we categorize the configuration and parameter settings by type to start to build our test environment. Below is a grid with these categories.

Services Types

The Kubernetes Master works with the Spark Driver on EKS

As with our EMR configurations, we tested many combinations of EKS configurations and Spark/VariantSpark job execution parameters. We were able to get a ‘like-for-like’ comparison, after we understood where to get parameter changes in each environment. EMR uses Hadoop (YARN), whereas EKS uses Kubernetes/Spark Driver settings.

Review VariantSpark Job Details

As we worked to create appropriate testing configurations, we considered the work steps involved in a VariantSpark job run. VariantSpark processes an input file and a label file and outputs significant (genomic) variants. To do this, there are various types of computation processes. Each process requires CPU, RAM and IO.

To verify resources needed for VariantSpark job phases, we proxied to the Spark UI during VariantSpark-EKS job runs. There are 4 main stages to a VariantSpark job analysis run. Each of these stages performs an activity and requires different types and amounts of cluster resources. The stages are as follows:

  • Prepare Spark Executors — runs within milliseconds, requires minimal resources
  • Zip with Index — prepares data, runs within minutes, requires appropriate compute, IO and memory.
  • Count at Importance — loads data, runs within 10 minutes, requires enough executor memory to fit all data. IMPORTANT: if cluster memory isn’t sufficient, then data will ‘spill’ to disk and this process runs 30–50x slower.
  • Run Decision Trees — analyzes data using multiple steps for each tree, variable run time (minutes to hours), depends on (configured) number of trees, requires appropriate compute and memory. The number of Decision Trees impacts the quality of the analysis results. For our initial testing we started by running a very small number of trees (100), we later expanded our testing to include analysis which produced as many as 10,000 trees. Properly accounting for tree building needs is a key aspect of scaling VariantSpark.

Shown below is a VariantSpark job running in the Spark console. You can see the four types of job steps in the Spark console — Executors, Zip, Count and Decision Trees.

VariantSpark job steps in the Spark Web UI

Goal 5: Monitor for Effective Sizing

We worked with a number of tools and interfaces to understand the resources needed for each VariantSpark analysis job. These tools allowed us to ‘see into’ the job executions, so that we could understand how to configure and scale the AWS EKS resources. The tools that we used included the following:

  • Kubernetes Web UI — pods and nodes (also the ‘kubectl’ Kubernetes client tool)
  • Terraform Templates-- output from terraform script runs told us when we exceeded AWS allocated resources (i.e. not enough of a certain size or type of EC2 instance, or out of EIP addresses, etc…)
  • Spark Web UI — live job steps, executor resource usage and logs

Shown to the below are sample outputs from the included Kubernetes UI.

Viewing an undersized cluster for a VariantSpark job in Kubernetes UI

We used these tools to understand whether or not that we had appropriately sized and configured our EKS cluster. The first image shows VariantSpark job step progress, data size and duration.

Verifying the VariantSpark EKS job step durations via the Spark UI

The image below shows use of the Spark UI to verify cluster-sizing for the data load (count or cache) phase of a VariantSpark job.

Verifying 100% of data is cached via the Spark UI

Success! We were able to repeatedly run a variety of VariantSpark workloads using EKS and S3 within a similar amount of compute time and, often, even less CPUs and RAM in our Kubernetes cluster than we needed to run comparable workloads in EMR.

Next Steps: From Batch to Pipeline

As with any project, although we created repeatable processes for scaling VariantSpark analysis jobs on the AWS cloud, there is still more work to be done.

Envisioning the Future

In interviewing research bioinformaticians on site at CSIRO, we worked together to understand their current analysis workflow and also to envision a ‘dream’ VariantSpark analysis workflow. We captured our conversation on a whiteboard (shown below).

Vision for VariantSpark Pipeline

The sketch above details a ‘warm’ pipeline that includes the following:

  1. CLIENT — Jupyter notebook client, which connect to multiple AWS endpoints. These endpoints are presented as public (API) gateway endpoints to AWS compute processes (Lambda, EKS, EMR, etc…)
  2. DATA — A lambda-driven job analysis-process, which kicks off by evaluating the size, type and quantity of data in one or more designated S3 buckets. It then calls an intermediate lambda which generates an analysis of needed AWS compute resources given the input data size. This lambda estimates time to prepare the compute resources, time to run them and estimated service cost. If input data size is larger than a defined threshold, then data compression, conversion and/or partitioning prior to analysis would be suggested and communicated back to the user via a message in the notebook client.
  3. DATA PREP (optional) — For ‘large’ data , the user could choose to initiate a ‘data prep’ step via the notebook interface which would then initiate downstream compute to compress, convert and/or partition input data and would output optimized data to a destination S3 bucket.
  4. ANALYSIS — the user would be presented with a set of choices in the notebook, i.e. ‘run job using AWS’ or ‘run job locally’. These choices would include estimated time to prepare environment, time to execute job and estimated compute cost (for AWS services).

    — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Taking Stock of Current State

We were able to successfully run a large synthetic workload using an automated, documented repeatable process (EKS via Terraform templates) within a reasonable amount of time, service and team (labor) cost.

A number of questions remain. These include the following:

  1. Can we use EKS use EC2 spot instances (requires EKS auto-scaler as well)? The answer appears to be yes — blog here.
  2. Can we use EKS auto-scaling for precision (rather than EC2 auto-scaling)? Again, the answer appear to be yes w/Kubernetes 1.12+ or above — link
  3. Can we use AWS tools to build CI/CD for EKS/custom Docker container and the VariantSpark jar file itself? We believe so, a partial potential architecture is shown below.
Potential CI/CD Pipeline for VariantSpark jar file

Although we’ve made good progress in our work to enable cloud-based, repeatable, faster feedback loops for VariantSpark analysis to date, there remains much work to do to implement the vision of moving from batch-based to full pipeline analysis.

Thanks to Dr. Denis Bauer & Arash Barat/CSIRO Bioinformatics, Piotr Szul/Data 61 and my team of Jim Counts, Matthew Jones and Samantha Langit for this work.

Cloud Architect who codes, Angel Investor