Running Argo Workflows Across Multiple Kubernetes Clusters

(updated April 9, 2020) We recently open-sourced multicluster-scheduler, a system of Kubernetes controllers that intelligently schedules workloads across clusters. In this blog post, we will use it with Argo to run multicluster workflows (pipelines, DAGs, ETLs) that better utilize resources and/or combine data from different regions or clouds.

UPDATE (2020-04-09) - Multicluster-scheduler has changed quite a bit since this blog post was originally published. Multicluster-scheduler is now a virtual-kubelet provider; observations and decisions were replaced by more direct control loops. The integration with Argo Workflows discussed here still works. Multicluster-scheduler also works with Argo CD, as shown in this ITNEXT blog post published today by Gokul Chandra.

Most enterprises that use Kubernetes manage multiple clusters. For various reasons, you may have one or several clusters per team, region, environment, or combination thereof. Your clusters may be hosted by different cloud providers—in a multicloud infrastructure—and/or on premises—in a hybrid infrastructure. The benefits of isolation, however, come at the expense of, among other things, reduced bin-packing efficiency and data fragmentation. Let's explore two scenarios.

  • Scenario A: You need to run a large parallel workflow, e.g., a machine learning training pipeline, which requires more resources than available in your team's cluster. You could scale out to go fast, or limit parallelism to save money. In the meantime, available resources in other teams' clusters stay idle. Multicluster-scheduler allows you to elect pods to be delegated to other clusters, where resources are available, from the comfort of your own cluster, with a single pod or pod template annotation.
  • Scenario B: You need to run a workflow that combines data from multiple clouds or regions. It is either more efficient or required to run some steps closer to their data sources. To optimize throughput or save on data egress charges, you may want to compress or aggregate the data, before loading the results closer to you. Or to respect privacy regulations and minimize your attack surface, you may want to anonymize the data as upstream as possible. You could deploy remote services or functions and call them from your workflow, but that would be complicated. Multicluster-scheduler allows you to simply specify which cluster a pod should run in, again, with a single pod or pod template annotation.

A Multicluster Pod's Journey

Here's a quick summary of a multicluster pod's journey.

  1. When a pod is created with the multicluster.admiralty.io/elect="" annotation, the multicluster-scheduler agent's mutating pod admission webhook replaces the pod's containers by a dummy busybox that just waits to be killed. The original spec is saved for later as another annotation.
  2. We call the resulting pod a proxy pod. The agent then sends an observation of the proxy pod to the scheduler's cluster, which can be the same cluster. The agent also watches other pods, nodes, and node pools and sends observations of them to the scheduler's cluster to guide its decisions.
  3. The scheduler creates a delegate pod decision in its own cluster. If the original pod was annotated with multicluster.admiralty.io/clustername=foo, the delegate pod decision is targeted at cluster "foo". Otherwise, the scheduler targets the cluster that could accommodate the most replicas of our pod, based on current observations. More advanced scheduling options are in the works.
  4. The agent in the target cluster sees the decision and creates a delegate pod. The delegate pod has the same spec as the original pod.
  5. An observation of the delegate pod is sent back to the scheduler's cluster. When the delegate pod is annotated, the same annotation is fed back to the proxy pod (e.g., so Argo can read step outputs), and when it succeeds or dies, a signal is sent to the proxy pod's container for it to succeed or die too.

For more details, check out the README.

Demonstration

Let's see that in action. Create two clusters, e.g., with Minikube or your favorite cloud provider. In this blog post, we'll assume their associated contexts in your kubeconfig are "cluster1" and "cluster2", but we'll use variables so you can use your own:

CLUSTER1=cluster1 # change me
CLUSTER2=cluster2 # change me

Now, following the installation guide, install the scheduler in cluster1 and the agent in both clusters.

We also need Argo in either cluster. We'll use cluster1 in this guide, but feel free to change the variables:

ARGO_CLUSTER=$CLUSTER1 # change me
NON_ARGO_CLUSTER=$CLUSTER2 # change me

Install the Argo controller and UI (step 2 of the Argo getting gtarted guide):

kubectl --context $ARGO_CLUSTER create ns argo
kubectl --context $ARGO_CLUSTER apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/stable/manifests/install.yaml

We're not big fans of giving Argo pods admin privileges, as recommended in step 3 of the Argo getting started guide, so we'll use a minimal service account instead. Because pods will run in the two clusters, we need this service account in both:

ARGO_POD_RBAC=https://raw.githubusercontent.com/admiraltyio/admiralty/master/examples/argo-workflows/_service-account.yaml
kubectl --context $ARGO_CLUSTER apply -f $ARGO_POD_RBAC
kubectl --context $NON_ARGO_CLUSTER apply -f $ARGO_POD_RBAC

Let's also install the Argo CLI locally—although it's optional—to nicely submit and track workflows:

# On Mac:
brew install argoproj/tap/argo
# On Linux:
curl -sSL -o /usr/local/bin/argo https://github.com/argoproj/argo/releases/download/v2.2.1/argo-linux-amd64
chmod +x /usr/local/bin/argo
# On Windows:
curl -sSL -o argo https://github.com/argoproj/argo/releases/download/v2.2.1/argo-windows-amd64 # and add to your PATH

(UPDATE, 2019-03-17, multicluster-scheduler v0.3) Finally, let's enable multicluster-scheduler in the default namespace:

kubectl --context $ARGO_CLUSTER label ns default multicluster-scheduler=enabled

You can now turn any Argo workflow into a multicluster workflow by adding multicluster.admiralty.io annotations to its pod templates. Also, don't forget to specify resource requests if you want the scheduler to decide where to run your pods.

Scenario A: Optimizing a Large Parallel Workflow

A default GKE cluster has three nodes, with 1 vCPU and 3.75GB of memory each, out of which 940m vCPU and 2.58GiB of memory are allocatable. The system pods, along with multicluster-scheduler and Argo already request 1840m vCPU in cluster1 and 1740m vCPU in cluster2. Therefore, cluster1 has 980m vCPU available and cluster2 has 1080m. We don't need to spend extra money for this experiment: we will model a "large parallel workflow" by 10 parallel steps requiring 200m vCPU each (including 100m for the Argo sidecar).

First, let's run the following single-cluster workflow (also available in the multicluster-scheduler samples directory):

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: singlecluster-parallel-
spec:
entrypoint: singlecluster-parallel
templates:
- name: singlecluster-parallel
steps:
- - name: sleep
template: sleep
withItems: [0, 1, 2, 3, 4, 5, 6, 7, 9, 10]
- name: sleep
container:
image: busybox
command: [sleep, 10]
resources:
requests:
cpu: 100m # Note: Argo sidecar adds another 100m

Submit it:

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/admiralty/master/examples/argo-workflows/blog-scenario-a-singlecluster.yaml

Here's the final state:

Duration: 1 minute 16 seconds
STEP PODNAME DURATION MESSAGE
✔ singlecluster-parallel-6rtkc
└-·-✔ sleep(0:0) singlecluster-parallel-6rtkc-839758060 11s
├-✔ sleep(1:1) singlecluster-parallel-6rtkc-1823198064 12s
├-✔ sleep(2:2) singlecluster-parallel-6rtkc-4064072188 11s
├-✔ sleep(3:3) singlecluster-parallel-6rtkc-2040401880 27s
├-✔ sleep(4:4) singlecluster-parallel-6rtkc-3078784476 27s
├-✔ sleep(5:5) singlecluster-parallel-6rtkc-3529283624 27s
├-✔ sleep(6:6) singlecluster-parallel-6rtkc-3081898924 43s
├-✔ sleep(7:7) singlecluster-parallel-6rtkc-2914639584 43s
├-✔ sleep(8:9) singlecluster-parallel-6rtkc-3024028329 43s
└-✔ sleep(9:10) singlecluster-parallel-6rtkc-3224503614 1m

It took the workflow 1 minute 16 seconds to run on cluster1 alone. We can see that cluster1 could only run three steps concurrently, in four waves, which is less than ideal, but expected, because the 940m vCPU available are in three "bins".

Let's annotate our workflow's pod template with multicluster.admiralty.io/elect="" to make it run on two clusters:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: multicluster-parallel-
spec:
entrypoint: multicluster-parallel
templates:
- name: multicluster-parallel
steps:
- - name: sleep
template: sleep
withItems: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- name: sleep
container:
image: busybox
command: [sleep, 10]
resources:
requests:
cpu: 100m # Note: Argo sidecar adds another 100m
metadata:
annotations:
multicluster.admiralty.io/elect: ""

Submit it:

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/admiralty/master/examples/argo-workflows/blog-scenario-a-multicluster.yaml

Here's the final state:

Duration: 31 seconds
STEP PODNAME DURATION MESSAGE
✔ multicluster-parallel-lmw2d
└-·-✔ sleep(0:0) multicluster-parallel-lmw2d-1353848687 12s
├-✔ sleep(1:1) multicluster-parallel-lmw2d-714502387 14s
├-✔ sleep(2:2) multicluster-parallel-lmw2d-894725111 14s
├-✔ sleep(3:3) multicluster-parallel-lmw2d-711387939 13s
├-✔ sleep(4:4) multicluster-parallel-lmw2d-479610983 14s
├-✔ sleep(5:5) multicluster-parallel-lmw2d-1696675651 13s
├-✔ sleep(6:6) multicluster-parallel-lmw2d-1336174783 15s
├-✔ sleep(7:7) multicluster-parallel-lmw2d-2767328819 29s
├-✔ sleep(8:9) multicluster-parallel-lmw2d-3117624962 29s
└-✔ sleep(9:10) multicluster-parallel-lmw2d-2469206667 29s

It took the workflow only 31 seconds to run across cluster1 and cluster2. Seven steps were able to run concurrently at first, followed by the three remaining steps. Notice that some of the steps were run in cluster1 and the others in cluster2:

kubectl --context $ARGO_CLUSTER get pods

outputs:

NAME READY STATUS RESTARTS AGE
cluster1-default-multicluster-parallel-lmw2d-1336174783 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-1696675651 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-2767328819 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-3117624962 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-479610983 0/2 Completed 0 4m
multicluster-parallel-lmw2d-1336174783 0/2 Completed 0 4m
multicluster-parallel-lmw2d-1353848687 0/2 Completed 0 4m
multicluster-parallel-lmw2d-1696675651 0/2 Completed 0 4m
multicluster-parallel-lmw2d-2469206667 0/2 Completed 0 4m
multicluster-parallel-lmw2d-2767328819 0/2 Completed 0 4m
multicluster-parallel-lmw2d-3117624962 0/2 Completed 0 4m
multicluster-parallel-lmw2d-479610983 0/2 Completed 0 4m
multicluster-parallel-lmw2d-711387939 0/2 Completed 0 4m
multicluster-parallel-lmw2d-714502387 0/2 Completed 0 4m
multicluster-parallel-lmw2d-894725111 0/2 Completed 0 4m
... (and all the pods from the single-cluster workflow)

The five pods whose names are prefixed with "cluster1-default-" are delegate pods. The prefix indicates their origin. The other pods are the proxy pods.

In cluster2, there are only delegate pods:

kubectl --context $NON_ARGO_CLUSTER get pods

outputs:

NAME READY STATUS RESTARTS AGE
cluster1-default-multicluster-parallel-lmw2d-1353848687 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-2469206667 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-711387939 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-714502387 0/2 Completed 0 4m
cluster1-default-multicluster-parallel-lmw2d-894725111 0/2 Completed 0 4m

Scenario B: Multicluster ETL

We will model this scenario with a simple DAG workflow, where steps A and C can run in cluster1, but step B must run in cluster2; step C depends on steps A and B. Note the use of the multicluster.admiralty.io/clustername pod template annotation to enforce a placement:

# A B*
# \ /
# C
#
# * B must run in cluster2
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: multicluster-dag-
spec:
entrypoint: multicluster-dag
templates:
- name: multicluster-dag
dag:
tasks:
- name: A
template: sleep
- name: B
template: sleep-remote
arguments:
parameters:
- name: clustername
value: cluster2 # change me
- name: C
dependencies: [A, B]
template: sleep
- name: sleep
container:
image: busybox
command: [sleep, 10]
- name: sleep-remote
inputs:
parameters:
- name: clustername
container:
image: busybox
command: [sleep, 10]
metadata:
annotations:
multicluster.admiralty.io/elect: ""
multicluster.admiralty.io/clustername: "{{inputs.parameters.clustername}}"

If NON_ARGO_CLUSTER is not equal to "cluster2" in your case, modify the workflow before submitting it.

argo --context $ARGO_CLUSTER submit --serviceaccount argo-workflow --watch https://raw.githubusercontent.com/admiraltyio/admiralty/master/examples/argo-workflows/blog-scenario-b.yaml

Here's the final state:

Duration: 26 seconds
STEP PODNAME DURATION MESSAGE
✔ multicluster-dag-ftrwh
├-✔ A multicluster-dag-ftrwh-745251266 11s
├-✔ B multicluster-dag-ftrwh-728473647 12s
└-✔ C multicluster-dag-ftrwh-711696028 12s

Note that step B was delegated to cluster2:

kubectl --context $NON_ARGO_CLUSTER get pods

outputs:

NAME READY STATUS RESTARTS AGE
cluster1-default-multicluster-dag-ftrwh-728473647 0/2 Completed 0 2m

In a real case scenario, you would pipe data between steps using artifact repositories and/or step input/outputs.

Discussion: Pod-Level Federation

As we've demonstrated, multicluster-scheduler integrates nicely with Argo. We didn't have to modify the Argo source code, and simple annotations to the workflows' manifests were enough to make them run across clusters. This would not have been possible with a project like Federation v2, which requires clients to use new, federated APIs, e.g., federated deployment templates, placements, and overrides. The main advantage of multicluster-scheduler is that it federates clusters at the pod level, "the smallest and simplest unit in the Kubernetes object model." The entire Kubernetes ecosystem revolves around pods. By choosing pods as multicluster-scheduler's unit, we're enabling a series of "for free", loosely coupled integrations. Multicluster Horizontal Pod Autoscaler with custom and external metrics should work out-of-the-box (we'll prove that soon), while integrations with Istio and Knative are in the works.

Conclusions

Multicluster-scheduler can run Argo workflows across Kubernetes clusters, delegating pods to where resources are available, or as specified by the user. It can make parallel workflows run faster without scaling out clusters, and it simplifies multi-region and multicloud ETL processes. This integration was made possible by multicluster-scheduler's architecture centered around pods, "the smallest and simplest unit in the Kubernetes object model." The way forward is exciting and includes, among other things, more integrations with the cloud native ecosystem, and advanced scheduling. We're curious to hear the thoughts and feedback of the community, and we welcome contributions!

Acknowledgements

Many thanks to the Argo authors for designing a great cloud-native workflow engine, and to the authors of controller-runtime, which powers a lot of the components of multicluster-scheduler.