Running Argo Workflows Across Multiple Kubernetes Clusters

We recently open-sourced multicluster-scheduler, a system of Kubernetes controllers that intelligently schedules workloads across clusters. In this blog post, we will use it with Argo to run multicluster workflows (pipelines, DAGs, ETLs) that better utilize resources and/or combine data from different regions or clouds.

Most enterprises that use Kubernetes manage multiple clusters. For various reasons, you may have one or several clusters per team, region, environment, or combination thereof. Your clusters may be hosted by different cloud providers—in a multicloud infrastructure—and/or on premises—in a hybrid infrastructure. The benefits of isolation, however, come at the expense of, among other things, reduced bin-packing efficiency and data fragmentation. Let’s explore two scenarios.

  • Scenario A: You need to run a large parallel workflow, e.g., a machine learning training pipeline, which requires more resources than available in your team’s cluster. You could scale out to go fast, or limit parallelism to save money. In the meantime, available resources in other teams’ clusters stay idle. Multicluster-scheduler allows you to elect pods to be delegated to other clusters, where resources are available, from the comfort of your own cluster, with a single pod or pod template annotation.
  • Scenario B: You need to run a workflow that combines data from multiple clouds or regions. It is either more efficient or required to run some steps closer to their data sources. To optimize throughput or save on data egress charges, you may want to compress or aggregate the data, before loading the results closer to you. Or to respect privacy regulations and minimize your attack surface, you may want to anonymize the data as upstream as possible. You could deploy remote services or functions and call them from your workflow, but that would be complicated. Multicluster-scheduler allows you to simply specify which cluster a pod should run in, again, with a single pod or pod template annotation.

A Multicluster Pod’s Journey

Here’s a quick summary of a multicluster pod’s journey.

  1. When a pod is created with the multicluster.admiralty.io/elect="" annotation, the multicluster-scheduler agent’s mutating pod admission webhook replaces the pod’s containers by a dummy busybox that just waits to be killed. The original spec is saved for later as another annotation.
  2. We call the resulting pod a proxy pod. The agent then sends an observation of the proxy pod to the scheduler’s cluster, which can be the same cluster. The agent also watches other pods, nodes, and node pools and sends observations of them to the scheduler’s cluster to guide its decisions.
  3. The scheduler creates a delegate pod decision in its own cluster. If the original pod was annotated with multicluster.admiralty.io/clustername=foo, the delegate pod decision is targeted at cluster “foo”. Otherwise, the scheduler targets the cluster that could accommodate the most replicas of our pod, based on current observations. More advanced scheduling options are in the works.
  4. The agent in the target cluster sees the decision and creates a delegate pod. The delegate pod has the same spec as the original pod.
  5. An observation of the delegate pod is sent back to the scheduler’s cluster. When the delegate pod is annotated, the same annotation is fed back to the proxy pod (e.g., so Argo can read step outputs), and when it succeeds or dies, a signal is sent to the proxy pod’s container for it to succeed or die too.

For more details, check out the README.

Demonstration

Let’s see that in action. Create two clusters, e.g., with Minikube or your favorite cloud provider. In this blog post, we’ll assume their associated contexts in your kubeconfig are “cluster1” and “cluster2”, but we’ll use variables so you can use your own:

CLUSTER1=cluster1 # change me
CLUSTER2=cluster2 # change me

Now, following the installation guide, install the scheduler in cluster1 and the agent in both clusters.

We also need Argo in either cluster. We’ll use cluster1 in this guide, but feel free to change the variables:

ARGO_CLUSTER=$CLUSTER1 # change me
NON_ARGO_CLUSTER=$CLUSTER2 # change me

Install the Argo controller and UI (step 2 of the Argo getting gtarted guide):

kubectl --context $ARGO_CLUSTER create ns argo
kubectl --context $ARGO_CLUSTER apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/v2.2.1/manifests/install.yaml

We’re not big fans of giving Argo pods admin privileges, as recommended in step 3 of the Argo getting started guide, so we’ll use a minimal service account instead. Because pods will run in the two clusters, we need this service account in both:

ARGO_POD_RBAC=https://raw.githubusercontent.com/admiraltyio/multicluster-scheduler/master/config/samples/argo-workflows/_service-account.yaml
kubectl --context $ARGO_CLUSTER apply -f $ARGO_POD_RBAC
kubectl --context $NON_ARGO_CLUSTER apply -f $ARGO_POD_RBAC

Let’s also install the Argo CLI locally—although it’s optional—to nicely submit and track workflows:

# On Mac:
brew install argoproj/tap/argo
# On Linux:
curl -sSL -o /usr/local/bin/argo https://github.com/argoproj/argo/releases/download/v2.2.1/argo-linux-amd64
chmod +x /usr/local/bin/argo
# On Windows:
curl -sSL -o argo https://github.com/argoproj/argo/releases/download/v2.2.1/argo-windows-amd64 # and add to your PATH

(UPDATE, 2019-03-17, multicluster-scheduler v0.3) Finally, let’s enable multicluster-scheduler in the default namespace:

kubectl --context $ARGO_CLUSTER label ns default multicluster-scheduler=enabled

You can now turn any Argo workflow into a multicluster workflow by adding multicluster.admiralty.io annotations to its pod templates. Also, don’t forget to specify resource requests if you want the scheduler to decide where to run your pods.

Scenario A: Optimizing a Large Parallel Workflow

A default GKE cluster has three nodes, with 1 vCPU and 3.75GB of memory each, out of which 940m vCPU and 2.58GiB of memory are allocatable. The system pods, along with multicluster-scheduler and Argo already request 1840m vCPU in cluster1 and 1740m vCPU in cluster2. Therefore, cluster1 has 980m vCPU available and cluster2 has 1080m. We don’t need to spend extra money for this experiment: we will model a “large parallel workflow” by 10 parallel steps requiring 200m vCPU each (including 100m for the Argo sidecar).

First, let’s run the following single-cluster workflow (also available in the multicluster-scheduler samples directory):

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: singlecluster-parallel-
spec:
  entrypoint: singlecluster-parallel
  templates:
  - name: singlecluster-parallel
    steps:
    - - name: sleep
        template: sleep
        withItems: [0, 1, 2, 3, 4, 5, 6, 7, 9, 10]
  - name: sleep
    container:
      image: busybox
      command: [sleep, 10]
      resources:
        requests:
          cpu: 100m # Note: Argo sidecar adds another 100m

Submit it:

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/multicluster-scheduler/master/config/samples/argo-workflows/blog-scenario-a-singlecluster.yaml

Here’s the final state:

Duration:            1 minute 16 seconds

STEP                             PODNAME                                  DURATION  MESSAGE
 ✔ singlecluster-parallel-6rtkc
 └-·-✔ sleep(0:0)                singlecluster-parallel-6rtkc-839758060   11s
   ├-✔ sleep(1:1)                singlecluster-parallel-6rtkc-1823198064  12s
   ├-✔ sleep(2:2)                singlecluster-parallel-6rtkc-4064072188  11s
   ├-✔ sleep(3:3)                singlecluster-parallel-6rtkc-2040401880  27s
   ├-✔ sleep(4:4)                singlecluster-parallel-6rtkc-3078784476  27s
   ├-✔ sleep(5:5)                singlecluster-parallel-6rtkc-3529283624  27s
   ├-✔ sleep(6:6)                singlecluster-parallel-6rtkc-3081898924  43s
   ├-✔ sleep(7:7)                singlecluster-parallel-6rtkc-2914639584  43s
   ├-✔ sleep(8:9)                singlecluster-parallel-6rtkc-3024028329  43s
   └-✔ sleep(9:10)               singlecluster-parallel-6rtkc-3224503614  1m

It took the workflow 1 minute 16 seconds to run on cluster1 alone. We can see that cluster1 could only run three steps concurrently, in four waves, which is less than ideal, but expected, because the 940m vCPU available are in three “bins”.

Let’s annotate our workflow’s pod template with multicluster.admiralty.io/elect="" to make it run on two clusters:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: multicluster-parallel-
spec:
  entrypoint: multicluster-parallel
  templates:
  - name: multicluster-parallel
    steps:
    - - name: sleep
        template: sleep
        withItems: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  - name: sleep
    container:
      image: busybox
      command: [sleep, 10]
      resources:
        requests:
          cpu: 100m # Note: Argo sidecar adds another 100m
    metadata:
      annotations:
        multicluster.admiralty.io/elect: ""

Submit it:

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/multicluster-scheduler/master/config/samples/argo-workflows/blog-scenario-a-multicluster.yaml

Here’s the final state:

Duration:            31 seconds

STEP                            PODNAME                                 DURATION  MESSAGE
 ✔ multicluster-parallel-lmw2d
 └-·-✔ sleep(0:0)               multicluster-parallel-lmw2d-1353848687  12s
   ├-✔ sleep(1:1)               multicluster-parallel-lmw2d-714502387   14s
   ├-✔ sleep(2:2)               multicluster-parallel-lmw2d-894725111   14s
   ├-✔ sleep(3:3)               multicluster-parallel-lmw2d-711387939   13s
   ├-✔ sleep(4:4)               multicluster-parallel-lmw2d-479610983   14s
   ├-✔ sleep(5:5)               multicluster-parallel-lmw2d-1696675651  13s
   ├-✔ sleep(6:6)               multicluster-parallel-lmw2d-1336174783  15s
   ├-✔ sleep(7:7)               multicluster-parallel-lmw2d-2767328819  29s
   ├-✔ sleep(8:9)               multicluster-parallel-lmw2d-3117624962  29s
   └-✔ sleep(9:10)              multicluster-parallel-lmw2d-2469206667  29s

It took the workflow only 31 seconds to run across cluster1 and cluster2. Seven steps were able to run concurrently at first, followed by the three remaining steps. Notice that some of the steps were run in cluster1 and the others in cluster2:

kubectl --context $ARGO_CLUSTER get pods

outputs:

NAME                                                      READY     STATUS      RESTARTS   AGE
cluster1-default-multicluster-parallel-lmw2d-1336174783   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-1696675651   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-2767328819   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-3117624962   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-479610983    0/2       Completed   0          4m
multicluster-parallel-lmw2d-1336174783                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-1353848687                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-1696675651                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-2469206667                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-2767328819                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-3117624962                    0/2       Completed   0          4m
multicluster-parallel-lmw2d-479610983                     0/2       Completed   0          4m
multicluster-parallel-lmw2d-711387939                     0/2       Completed   0          4m
multicluster-parallel-lmw2d-714502387                     0/2       Completed   0          4m
multicluster-parallel-lmw2d-894725111                     0/2       Completed   0          4m
... (and all the pods from the single-cluster workflow)

The five pods whose names are prefixed with “cluster1-default-” are delegate pods. The prefix indicates their origin. The other pods are the proxy pods.

In cluster2, there are only delegate pods:

kubectl --context $NON_ARGO_CLUSTER get pods

outputs:

NAME                                                      READY     STATUS      RESTARTS   AGE
cluster1-default-multicluster-parallel-lmw2d-1353848687   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-2469206667   0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-711387939    0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-714502387    0/2       Completed   0          4m
cluster1-default-multicluster-parallel-lmw2d-894725111    0/2       Completed   0          4m

Scenario B: Multicluster ETL

We will model this scenario with a simple DAG workflow, where steps A and C can run in cluster1, but step B must run in cluster2; step C depends on steps A and B. Note the use of the multicluster.admiralty.io/clustername pod template annotation to enforce a placement:

#  A   B*
#   \ /
#    C
#
# * B must run in cluster2
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: multicluster-dag-
spec:
  entrypoint: multicluster-dag
  templates:
  - name: multicluster-dag
    dag:
      tasks:
      - name: A
        template: sleep
      - name: B
        template: sleep-remote
        arguments:
          parameters:
          - name: clustername
            value: cluster2 # change me
      - name: C
        dependencies: [A, B]
        template: sleep
  - name: sleep
    container:
      image: busybox
      command: [sleep, 10]
  - name: sleep-remote
    inputs:
      parameters:
      - name: clustername
    container:
      image: busybox
      command: [sleep, 10]
    metadata:
      annotations:
        multicluster.admiralty.io/elect: ""
        multicluster.admiralty.io/clustername: "{{inputs.parameters.clustername}}"

If NON_ARGO_CLUSTER is not equal to “cluster2” in your case, modify the workflow before submitting it.

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/multicluster-scheduler/master/config/samples/argo-workflows/blog-scenario-b.yaml

Here’s the final state:

Duration:            26 seconds

STEP                       PODNAME                           DURATION  MESSAGE
 ✔ multicluster-dag-ftrwh
 ├-✔ A                     multicluster-dag-ftrwh-745251266  11s
 ├-✔ B                     multicluster-dag-ftrwh-728473647  12s
 └-✔ C                     multicluster-dag-ftrwh-711696028  12s

Note that step B was delegated to cluster2:

kubectl --context $NON_ARGO_CLUSTER get pods

outputs:

NAME                                                READY     STATUS      RESTARTS   AGE
cluster1-default-multicluster-dag-ftrwh-728473647   0/2       Completed   0          2m

In a real case scenario, you would pipe data between steps using artifact repositories and/or step input/outputs.

Discussion: Pod-Level Federation

As we’ve demonstrated, multicluster-scheduler integrates nicely with Argo. We didn’t have to modify the Argo source code, and simple annotations to the workflows’ manifests were enough to make them run across clusters. This would not have been possible with a project like Federation v2, which requires clients to use new, federated APIs, e.g., federated deployment templates, placements, and overrides. The main advantage of multicluster-scheduler is that it federates clusters at the pod level, “the smallest and simplest unit in the Kubernetes object model.” The entire Kubernetes ecosystem revolves around pods. By choosing pods as multicluster-scheduler’s unit, we’re enabling a series of “for free”, loosely coupled integrations. Multicluster Horizontal Pod Autoscaler with custom and external metrics should work out-of-the-box (we’ll prove that soon), while integrations with Istio and Knative are in the works.

Conclusions

Multicluster-scheduler can run Argo workflows across Kubernetes clusters, delegating pods to where resources are available, or as specified by the user. It can make parallel workflows run faster without scaling out clusters, and it simplifies multi-region and multicloud ETL processes. This integration was made possible by multicluster-scheduler’s architecture centered around pods, “the smallest and simplest unit in the Kubernetes object model.” The way forward is exciting and includes, among other things, more integrations with the cloud native ecosystem, and advanced scheduling. We’re curious to hear the thoughts and feedback of the community, and we welcome contributions!

Acknowledgements

Many thanks to the Argo authors for designing a great cloud-native workflow engine, and to the authors of controller-runtime, which powers a lot of the components of multicluster-scheduler.

Receive more articles like this in your inbox.