Scaling Keras on Kubernetes with Kubeflow

Scalability is one of the major requirements for resource-intensive ML workloads and models that consist of multiple layers, billions of weights, and complex error-minimization logic. Processing client requests for complex models like recurrent neural nets, deep convolutional networks, or generative models takes up a significant amount of CPU time and compute power. These resources need to be dynamically scaled once the model gains traction among users. Additionally, ML models need to be regularly retrained on newly ingested data. Consequently, the growth of datasets requires that more and more compute power be available on-demand. 

Kubernetes is a powerful container orchestration platform that meets the scalability requirements of modern ML workflows and applications. The platform supports various abstractions for the creation of stateful replicated deployments, autoscaling, high availability, and service discovery, which make it easy to serve and retrain ML models.

In this blog post, I show how to scale Keras ML models on Kubernetes using the Kubeflow ML platform. First, I discuss the Keras deep learning library and its key features, then walk readers through the process of deploying Keras training jobs with Kubeflow. I talk about Kubeflow training job abstractions and the way to implement distributed training of Keras models and then proceed to the discussion of Kubeflow and Kubernetes tools suitable for serving Keras models in production. In the last part of this tutorial, I’ll show you how to use Seldon Core and other serving frameworks to efficiently scale Keras models on Kubernetes in production. 

Overview of Keras 

Keras is a popular Python library that provides high-level APIs and tools for implementing deep learning models. It offers a lot of pre-built neural network layers (e.g., ReLU, Softmax, fully connected layers), loss functions, and evaluation metrics that allow you to easily compose a neural architecture of any complexity without implementing low-level error minimization logic using differential programming

Originally, Keras supported various ML backends such as Theano and TensorFlow. Users could specify what backend they wanted to use to power the high-level Keras API. Since the TensorFlow 2 release, Keras has mainly supported the TensorFlow backend and is now an official high-level deep learning API for TensorFlow provided via the tf.keras module. TensorFlow gives you the user-friendliness of Keras while providing access to all low-level TensorFlow classes.

Below, I’ll discuss how to scale Keras models as part of TensorFlow on Kubernetes using Kubeflow. But before diving into some practical examples, I first want to discuss why Kubeflow is a great tool for Keras. 

What Is Kubeflow?

Kubeflow is an ML platform for Kubernetes designed to automate ML development, testing, and deployment. It provides useful tools for ML model containerization, model optimization, training, and serving. One of the best features of Kubeflow is the Kubeflow Pipelines that allow creating reusable ML workflows composed of multiple components. 

Other useful Kubeflow features include:

      • Multi-framework support for popular ML libraries 
      • Flexible containerization of ML code with Kubeflow Fairing
      • ML model logging and metadata management 
      • Composable ML workflows using Kubeflow Pipelines
      • Distributed training using TF Operator and the MPI Operator
      • GPU-based training
      • Advanced hyperparameter optimization and neural architecture search (NAS) with Kubeflow Katib
      • Multi-platform serving frameworks for deploying ML models

In addition, Kubeflow is a great tool for scaling Keras models. In particular, it provides custom resources for distributed training of TensorFlow/Keras models and autoscaling of inference services.

Training and Scaling Keras Models in Kubernetes

All the examples below assume that you have an operational Kubernetes cluster and Kubeflow deployed to this cluster. To learn how to install Kubeflow on Kubernetes, you can check the official Kubeflow guides for AKS, GKE, and Azure

Scaling ML training involves the ability to increase the number of running workers/learners, CPUs/GPUs, or other compute resources available in the cluster. However, scaling Keras training jobs is not so straightforward because training an ML model with multiple workers requires a distributed strategy. It allows for substantially faster training iterations and speeds up ML experimentation and CI/CD pipelines, but building such a distributed strategy for ML training is hard because it can involve complex coordination between learners and aggregation of parameters. 

In general, there are two main approaches to distributed training popular in the ML community today: synchronous and asynchronous training. 

In synchronous distributed training, each worker processes its part of the training dataset. Workers also communicate with each other to process their part of the gradient and aggregate results. The most popular approach to synchronous training is based on all-reduce algorithms. 

Contrary to this, in asynchronous training, all learners work on the complete training data independently, updating parameters asynchronously. Async training is implemented via a parameter server architecture where parameter updates are aggregated and performed by a parameter server that coordinates the interactions between workers. 

Distributed Training with TFOperator

Kubeflow TFOperator and the custom resource TFJob allow you to run distributed training jobs for Keras models using the two approaches. To enable distributed training, the TF/Keras model code should implement one of the distribution strategies supported by TensorFlow. Among the synchronous strategies, the tf.distribute.Strategy module supports MirroredStrategy, TPUStrategy, and MultiWorkerMirroredStrategy. It also provides the ParameterServerStrategy for asynchronous training with parameter servers. For more information about available TF distribution strategies and how to implement them in your Keras code see this article

TFOperator enables the implementation of these strategies in Kubernetes via several components. 

      • Chief: The component that orchestrates training and performs checkpointing of the model.
      • Parameter server (PS): Provides data store for model parameters and performs gradient updates.
      • Worker: The entity that executes training.
      • Evaluator: The component that computes model evaluation metrics during the training.

Are you a tech blogger?

We're currently seeking new cloud experts to join our network of influencers. Devops? Serverless? Machine learning?

For example, you could use the manifest like the one below to define the ParameterServer or MirroredStrategy depending on the actual logic of your Keras and TensorFlow code: 

apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "mnist-tf-job"
spec:
  tfReplicaSpecs:
    PS:
      replicas: 2
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: tensorflow
              image:                                  docker.io//tf-dist-mnist-test:1.0
    Worker:
      replicas: 2
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: tensorflow
              image:                                  docker.io//tf-dist-mnist-test:1.0

This manifest defines two workers and one parameter server. Correspondingly, the ML developer can implement one of the strategies compatible with this TFJob configuration in their Keras code. For example, below is an excerpt of the TF code that uses the MultiWorkerMirroredStrategy designed for all-reduce style training across multiple nodes:

def main(args):

  # MultiWorkerMirroredStrategy creates copies of all variables in the model’s
  # layers on each device across all workers
  # if your GPUs don’t support NCCL, replace “communication” with another
  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
      communication=tf.distribute.experimental.CollectiveCommunication.NCCL)

  BATCH_SIZE_PER_REPLICA = 64
  BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

  with strategy.scope():
    ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = \
        tf.data.experimental.AutoShardPolicy.DATA
    ds_train = ds_train.with_options(options)
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = build_and_compile_cnn_model()

  # Define the checkpoint directory to store the checkpoints
  checkpoint_dir = args.checkpoint_dir
….

As you see, tf.distribute.experimental.MultiWorkerMirroredStrategy is used with the NVIDIA Collective Communication Library (NCCL), which allows for multi-GPU and multi-node communication optimized for NVIDIA GPUs. As a part of the distribution strategy, you also need to define a batch size (data slice) for each worker. 

Once the distributed strategy is defined in the code and the code is provided to the TFJob, the TF Operator assigns all workers and other components to each batch. It also monitors the training process to ensure that the training proceeds smoothly. 

Distributed Training with MPI Operator

As an alternative to scaling your Keras models with TFJob, you can use MPI Operator, which supports multiple ML frameworks including TF and Keras. 

Under the hood, The MPI Operator uses the Message Passing Interface (MPI), a standardized API for parallel and distributed computing that enables efficient communication between nodes and abstracting different network types and communication protocols. The MPI provides a way to organize communication between the workers performing distributed training. 

Now, I’ll take you through how to perform all-reduce-style distributed training using MPI Operator.

The Kubeflow 1.0 installation includes MPI Operator by default, so you already have it in your cluster if you followed the official installation guide.

For the fastest way to test how MPI can scale your Keras training job using the distributed all-reduce training, refer to the example from the MPI official repository. This uses the container with Keras benchmark testing for the ResNet image classification model. The MPI custom resource for this training job looks as follows:

apiVersion: kubeflow.org/v1alpha2
kind: MPIJob
metadata:
  name: tensorflow-benchmarks
spec:
  slotsPerWorker: 1
  cleanPodPolicy: Running
  mpiReplicaSpecs:
    Launcher:
      replicas: 1
      template:
        spec:
          containers:
          - image: mpioperator/tensorflow-benchmarks:latest
            name: tensorflow-benchmarks
            command:
            - mpirun
            - --allow-run-as-root
            - -np
            - "2"
            - -bind-to
            - none
            - -map-by
            - slot
            - -x
            - NCCL_DEBUG=INFO
            - -x
            - LD_LIBRARY_PATH
            - -x
            - PATH
            - -mca
            - pml
            - ob1
            - -mca
            - btl
            - ^openib
            - python
            - scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py
            - --model=resnet101
            - --batch_size=64
            - --variable_update=horovod
    Worker:
      replicas: 2
      template:
        spec:
          containers:
          - image: mpioperator/tensorflow-benchmarks:latest
            name: tensorflow-benchmarks
            resources:
              limits:
                nvidia.com/gpu: 1

You also need to describe key MPI parameters used in this manifest. The main parameter of the top spec is the slotsPerWorker. Open MPI defines a slot as the number of CPUs/GPUs a worker can use. 

Under mpiReplicaSpecs, define the configuration for MPI launchers that implement the control logic for the distributed training. The “launcher” pod starts training jobs using the mpirun program that executes serial and parallel jobs in the Open MPI. This configures all the environment variables you need to remotely execute mpirun commands.

The manifest users a number of parameters described in the official Open MPI standard:

      • np: The number of copies of a model to be run on worker nodes (two in this spec)
      • map-by: Maps the process to a specified object such as a socket, core, slot, or processor cache (worker slots in this example)
      • x: Exports variables to remote nodes before program execution (here, debugging and path information for logging and model evaluation)
      • mca: Passes parameters to various MCA (Modular Component Architecture) modules that configure different communication channels (point-to-point communication, BTL, etc.) 
          • pml: Handles point-to-point communication 
          • ob1:Component in the PML framework that executes communication using BTL components (more info here)

The last part of the MPI manifest defines the number of workers. In the MPI training context, workers are nodes, so the cluster should have at least two nodes for this example to work.

Serving Keras Models Using Kubernetes and Kubeflow Tools

Kubeflow provides several model serving options for Keras with single-framework and multi-framework support.

If you want to use the TFX (TensorFlow Extended) native tools for serving Keras models, you can start with TF Serving—a Kubeflow wrapper around the TFX package designed for serving TF models. TF Serving includes versioning and traffic splitting among different model versions, service rollouts, automatic lifecycle management, data source discovery, and many more features.

A multi-framework alternative to serving Keras models on Kubernetes is Seldon Core. This tool allows you to convert ML models created with TF, PyTorch, and other ML frameworks into REST/gRPC microservices. It supports such features as autoscaling, advanced metrics, request logging, outlier detectors, A/B Tests, canary rollouts, etc. 

Another multi-framework serving tool that supports autoscaling of Keras models is KFServing. Under the hood, KFServing uses the Knative Autoscaler, which is automatically enabled once the inference service is deployed. Knative autoscaling works off of the average number of incoming requests for each pod. KFServing sets the target concurrency to be 1 by default. In this case, if the service is loaded with 5 concurrent requests, the served deployment will be autoscaled to 5 pods. 

Yet another alternative for autoscaling of Keras-trained models is BentoML, which comes with support for all the top MLG training frameworks, such as TensorFlow, Keras, and XGBoost.

To give you an idea of how to autoscale Keras-served models with Kubeflow, I’ll use the example of Seldon Core. Note that before using Seldon Core for Keras training jobs, you should manually deploy it to your K8s cluster. 

Seldon Core provides a set of optimized model servers for popular DL and ML frameworks including TensorFlow, which can be used out of the box to serve Keras models. To serve a model, you should only upload all model binaries to your preferred object store, like an AWS S3 Bucket or Google Cloud Storage. A simple manifest for the trained Keras model with the saved model parameters can look as follows: 

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  name: tfserving
spec:
  name: mnist
  predictors:
  - graph:
      children: []
      implementation: TENSORFLOW_SERVER
      modelUri: gs://seldon-models/tfserving/mnist-model
      name: mnist-model
      parameters:
        - name: signature_name
          type: STRING
          value: predict_images
        - name: model_name
          type: STRING
          value: mnist-model
    name: default
    replicas: 1

This manifest defines a preconfigured TF server that serves the TF MNIST model. The Seldon Core inference graph specified under the graph section can define interactions between different model components or the ensemble of the models. 

Once the model is deployed, it can be accessed using the Seldon OpenAPI schema. For example:

http://<ingress_url>/seldon/<namespace>/<model-name>/api/v1.0/doc/

Keras models deployed using Seldon Core can be easily autoscaled using the Kubernetes Horizontal Pod Autoscaler (HPA). It can dynamically increase the number of pods based on user-defined metrics such as CPU/RAM load or network requests. 

To autoscale Seldon-served models, make sure you have the following:

      • A resource request pointing to the metric to be scaled (e.g., CPU or memory)
      • An HPA spec referring to the Seldon deployment

The manifest with the HPA can look something like this:

apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
  name: seldon-model
spec:
  name: test-deployment
  predictors:
  - componentSpecs:
    - hpaSpec:
        maxReplicas: 3
        metrics:
        - resource:
            name: cpu
            targetAverageUtilization: 10
          type: Resource
        minReplicas: 1
      spec:
        containers:
        - image: seldonio/mock_classifier_rest:1.3
          imagePullPolicy: IfNotPresent
          name: classifier
          resources:
            requests:
              cpu: '0.5'
        terminationGracePeriodSeconds: 1
    graph:
      children: []
      endpoint:
        type: REST
      name: classifier
      type: MODEL
    name: example

In this example, HPA autoscaler would scale a trained Keras model up to three replicas based on the average CPU utilization among workers.

Conclusion

As I’ve shown in this article, Kubernetes and Kubeflow provide many tools for scaling Keras models in distributed compute environments. Kubernetes Horizontal Pod Autoscaler (HPA) can be used as part of the Seldon Core serving deployment to scale replicas based on the observed CPU, network, and memory metrics. Similarly, you can enable autoscaling for served models using the built-in KFServing autoscaler based on Knative Serving. 

One of the best features for Keras autoscaling provided by Kubeflow on Kubernetes is undoubtedly the distributed training using synchronous and asynchronous approaches. Distributed training is available as part of the TFJob that understands TF distribution strategies and the MPI Operator based on the Open API framework. There are many other scaling options not covered in the present article including manual scaling via K8s native resources, such as StatefulSets and Deployments. 

No matter what option you select, Kubeflow and Kubernetes autoscaling can dramatically speed up training of your Keras models on Kubernetes. The upside of autoscaling is a more efficient ML development workflow, which enables faster experimentation and prototyping and thus faster time to market (TTM) for your Keras models.

Tech content for tech experts by tech experts.

Learn more about IOD's content research & creation services.

Related posts