8000 Airflow GKE Operator sample (#6692) · runck014/python-docs-samples@6a602d8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6a602d8

Browse files
authored
Airflow GKE Operator sample (GoogleCloudPlatform#6692)
## Description Part of cl/393442533 - this is an adaptation of the longtime Composer/Airflow fan favorite [Using the KubernetesPodOperator](https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator) This PR... - Adds a sample compatible with Airflow 2+ - Adds a sample compatible with the bridge versions of Airflow 1 (1.10.14/1.10.15) - Adds the Composer Airflow 1 requirements to the "ignore" section of renovate - those are mostly in maintenance mode - Updates the constraints file increasing the google providers so that a necessary bug fix is included If you run in your Airflow environment following the tutorial in that CL, your dag will - Create a GKE Cluster - Run 4 tasks on it - Delete the Cluster Successful Dagruns - [Airflow 1](https://screenshot.googleplex.com/zXW5wHzXxtT6T9Q) - [Airflow 2 Composer v1](https://screenshot.googleplex.com/9PqFbjVQrj8D6vs) - [Airflow 2 Composer 2](https://screenshot.googleplex.com/APZyRAPRpQHrCxf) ## Checklist - [x] I have followed [Sample Guidelines from AUTHORING_GUIDE.MD](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md) - [x] README is updated to include [all relevant information](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/ 10000 master/AUTHORING_GUIDE.md#readme-file) - [x] **Tests** pass: `nox -s py-3.6` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md#test-environment-setup)) - [x] **Lint** pass: `nox -s lint` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] These samples need a new **API enabled** in testing projects to pass (let us know which ones) - [ ] These samples need a new/updated **env vars** in testing projects set to pass (let us know which ones) - [x] Please **merge** this PR for me once it is approved. - [ ] This sample adds a new sample directory, and I updated the [CODEOWNERS file](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/.github/CODEOWNERS) with the codeowners for this sample
1 parent 4d603f0 commit 6a602d8

File tree

8 files changed

+560
-5
lines changed

8 files changed

+560
-5
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
# [START composer_gkeoperator_airflow_1]
17+
18+
19+
from airflow import models
20+
from airflow.operators.bash_operator import BashOperator
21+
from airflow.providers.google.cloud.operators.kubernetes_engine import (
22+
GKECreateClusterOperator,
23+
GKEDeleteClusterOperator,
24+
GKEStartPodOperator,
25+
)
26+
from airflow.utils.dates import days_ago
27+
28+
29+
with models.DAG(
30+
"example_gcp_gke",
31+
schedule_interval=None, # Override to match your needs
32+
start_date=days_ago(1),
33+
tags=["example"],
34+
) as dag:
35+
36+
# [START composer_gke_create_cluster_airflow_1]
37+
# [START composer_gkeoperator_minconfig_airflow_1]
38+
# [START composer_gkeoperator_templateconfig_airflow_1]
39+
# [START composer_gkeoperator_affinity_airflow_1]
40+
# [START composer_gkeoperator_fullconfig_airflow_1]
41+
# TODO(developer): update with your values
42+
PROJECT_ID = "my-project-id"
43+
CLUSTER_ZONE = "us-west1-a"
44+
CLUSTER_NAME = "example-cluster"
45+
# [END composer_gkeoperator_minconfig_airflow_1]
46+
# [END composer_gkeoperator_templateconfig_airflow_1]
47+
# [END composer_gkeoperator_affinity_airflow_1]
48+
# [END composer_gkeoperator_fullconfig_airflow_1]
49+
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
50+
# [END composer_gke_create_cluster_airflow_1]
51+
# [START composer_gke_create_cluster_airflow_1]
52+
create_cluster = GKECreateClusterOperator(
53+
task_id="create_cluster",
54+
project_id=PROJECT_ID,
55+
location=CLUSTER_ZONE,
56+
body=CLUSTER,
57+
)
58+
# Using the BashOperator to create node pools is a workaround
59+
# In Airflow 2, because of https://github.com/apache/airflow/pull/17820
60+
# Node pool creation can be done using the GKECreateClusterOperator
61+
62+
create_node_pools = BashOperator(
63+
task_id="create_node_pools",
64+
bash_command=f"gcloud container node-pools create pool-0 \
65+
--cluster {CLUSTER_NAME} \
66+
--num-nodes 1 \
67+
--zone {CLUSTER_ZONE} \
68+
&& gcloud container node-pools create pool-1 \
69+
--cluster {CLUSTER_NAME} \
70+
--num-nodes 1 \
71+
--zone {CLUSTER_ZONE}",
72+
)
73+
# [END composer_gke_create_cluster_airflow_1]
74+
75+
# [START composer_gkeoperator_minconfig_airflow_1]
76+
kubernetes_min_pod = GKEStartPodOperator(
77+
# The ID specified for the task.
78+
task_id="pod-ex-minimum",
79+
# Name of task you want to run, used to generate Pod ID.
80+
name="pod-ex-minimum",
81+
project_id=PROJECT_ID,
82+
location=CLUSTER_ZONE,
83+
cluster_name=CLUSTER_NAME,
84+
# Entrypoint of the container, if not specified the Docker container's
85+
# entrypoint is used. The cmds parameter is templated.
86+
cmds=["echo"],
87+
# The namespace to run within Kubernetes, default namespace is
88+
# `default`. There is the potential for the resource starvation of
89+
# Airflow workers and scheduler within the Cloud Composer environment,
90+
# the recommended solution is to increase the amount of nodes in order
91+
# to satisfy the computing requirements. Alternatively, launching pods
92+
# into a custom namespace will stop fighting over resources.
93+
namespace="default",
94+
# Docker image specified. Defaults to hub.docker.com, but any fully
95+
# qualified URLs will point to a custom repository. Supports private
96+
# gcr.io images if the Composer Environment is under the same
97+
# project-id as the gcr.io images and the service account that Composer
98+
# uses has permission to access the Google Container Registry
99+
# (the default service account has permission)
100+
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
101+
)
102+
# [END composer_gkeoperator_minconfig_airflow_1]
103+
104+
# [START composer_gkeoperator_templateconfig_airflow_1]
105+
kubenetes_template_ex = GKEStartPodOperator(
106+
task_id="ex-kube-templates",
107+
name="ex-kube-templates",
108+
project_id=PROJECT_ID,
109+
location=CLUSTER_ZONE,
110+
cluster_name=CLUSTER_NAME,
111+
namespace="default",
112+
image="bash",
113+
# All parameters below are able to be templated with jinja -- cmds,
114+
# arguments, env_vars, and config_file. For more information visit:
115+
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
116+
# Entrypoint of the container, if not specified the Docker container's
117+
# entrypoint is used. The cmds parameter is templated.
118+
cmds=["echo"],
119+
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
120+
# will echo the execution date. Arguments to the entrypoint. The docker
121+
# image's CMD is used if this is not provided. The arguments parameter
122+
# is templated.
123+
arguments=["{{ ds }}"],
124+
# The var template variable allows you to access variables defined in
125+
# Airflow UI. In this case we are getting the value of my_value and
126+
# setting the environment variable `MY_VALUE`. The pod will fail if
127+
# `my_value` is not set in the Airflow UI.
128+
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
129+
)
130+
# [END composer_gkeoperator_templateconfig_airflow_1]
131+
132+
# [START composer_gkeoperator_affinity_airflow_1]
133+
kubernetes_affinity_ex = GKEStartPodOperator(
134+
task_id="ex-pod-affinity",
135+
project_id=PROJECT_ID,
136+
location=CLUSTER_ZONE,
137+
cluster_name=CLUSTER_NAME,
138+
name="ex-pod-affinity",
139+
namespace="default",
140+
image="perl",
141+
cmds=["perl"],
142+
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
143+
# affinity allows you to constrain which nodes your pod is eligible to
144+
# be scheduled on, based on labels on the node. In this case, if the
145+
# label 'cloud.google.com/gke-nodepool' with value
146+
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
147+
# nodes, it will fail to schedule.
148+
affinity={
149+
"nodeAffinity": {
150+
# requiredDuringSchedulingIgnoredDuringExecution means in order
151+
# for a pod to be scheduled on a node, the node must have the
152+
# specified labels. However, if labels on a node change at
153+
# runtime such that the affinity rules on a pod are no longer
154+
# met, the pod will still continue to run on the node.
155+
"requiredDuringSchedulingIgnoredDuringExecution": {
156+
"nodeSelectorTerms": [
157+
{
158+
"matchExpressions": [
159+
{
160+
# When nodepools are created in Google Kubernetes
161+
# Engine, the nodes inside of that nodepool are
162+
# automatically assigned the label
163+
# 'cloud.google.com/gke-nodepool' with the value of
164+
# the nodepool's name.
165+
"key": "cloud.google.com/gke-nodepool",
166+
"operator": "In",
167+
# The label key's value that pods can be scheduled
168+
# on.
169+
"values": [
170+
"pool-1",
171+
],
172+
}
173+
]
174+
}
175+
]
176+
}
177+
}
178+
},
179+
)
180+
# [END composer_gkeoperator_affinity_airflow_1]
181+
# [START composer_gkeoperator_fullconfig_airflow_1]
182+
kubernetes_full_pod = GKEStartPodOperator(
183+
task_id="ex-all-configs",
184+
name="full",
185+
project_id=PROJECT_ID,
186+
location=CLUSTER_ZONE,
187+
cluster_name=CLUSTER_NAME,
188+
namespace="default",
189+
image="perl",
190+
# Entrypoint of the container, if not specified the Docker container's
191+
# entrypoint is used. The cmds parameter is templated.
192+
cmds=["perl"],
193+
# Arguments to the entrypoint. The docker image's CMD is used if this
194+
# is not provided. The arguments parameter is templated.
195+
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
196+
# The secrets to pass to Pod, the Pod will fail to create if the
197+
# secrets you specify in a Secret object do not exist in Kubernetes.
198+
secrets=[],
199+
# Labels to apply to the Pod.
200+
labels={"pod-label": "label-name"},
201+
# Timeout to start up the Pod, default is 120.
202+
startup_timeout_seconds=120,
203+
# The environment variables to be initialized in the container
204+
# env_vars are templated.
205+
env_vars={"EXAMPLE_VAR": "/example/value"},
206+
# If true, logs stdout output of container. Defaults to True.
207+
get_logs=True,
208+
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
209+
# the Kubelet to skip pulling an image if it already exists. If you
210+
# want to always pull a new image, set it to 'Always'.
211+
image_pull_policy="Always",
212+
# Annotations are non-identifying metadata you can attach to the Pod.
213+
# Can be a large range of data, and can include characters that are not
214+
# permitted by labels.
215+
annotations={"key1": "value1"},
216+
# Resource specifications for Pod, this will allow you to set both cpu
217+
# and memory limits and requirements.
218+
# Prior to Airflow 1.10.4, resource specifications were
219+
# passed as a Pod Resources Class object,
220+
# If using this example on a version of Airflow prior to 1.10.4,
221+
# import the "pod" package from airflow.contrib.kubernetes and use
222+
# resources = pod.Resources() instead passing a dict
223+
# For more info see:
224+
# https://github.com/apache/airflow/pull/4551
225+
resources={"limit_memory": "250M", "limit_cpu": "100m"},
226+
# If true, the content of /airflow/xcom/return.json from container will
227+
# also be pushed to an XCom when the container ends.
228+
do_xcom_push=False,
229+
# List of Volume objects to pass to the Pod.
230+
volumes=[],
231+
# List of VolumeMount objects to pass to the Pod.
232+
volume_mounts=[],
233+
# Affinity determines which nodes the Pod can run on based on the
234+
# config. For more information see:
235+
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
236+
affinity={},
237+
)
238+
# [END composer_gkeoperator_fullconfig_airflow_1]
239+
# [START composer_gkeoperator_delete_cluster_airflow_1]
240+
delete_cluster = GKEDeleteClusterOperator(
241+
task_id="delete_cluster",
242+
name=CLUSTER_NAME,
243+
project_id=PROJECT_ID,
244+
location=CLUSTER_ZONE,
245+
)
246+
# [END composer_gkeoperator_delete_cluster_airflow_1]
247+
248+
create_cluster >> create_node_pools >> kubernetes_min_pod >> delete_cluster
249+
create_cluster >> create_node_pools >> kubernetes_full_pod >> delete_cluster
250+
create_cluster >> create_node_pools >> kubernetes_affinity_ex >> delete_cluster
251+
create_cluster >> create_node_pools >> kubenetes_template_ex >> delete_cluster
252+
253+
# [END composer_gkeoperator_airflow_1]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import internal_unit_testing
16+
17+
18+
def test_dag_import():
19+
"""Test that the DAG file can be successfully imported.
20+
21+
This tests that the DAG can be parsed, but does not run it in an Airflow
22+
environment. This is a recommended confidence check by the official Airflow
23+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
24+
"""
25+
from . import gke_operator as module
26+
27+
internal_unit_testing.assert_has_valid_dag(module)

composer/airflow_1_samples/requirements.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
apache-airflow[gcp]==1.10.14
2-
kubernetes==18.20.0
1+
apache-airflow[gcp]==1.10.15
2+
apache-airflow-backport-providers-google==2021.3.3
3+
apache-airflow-backport-providers-cncf-kubernetes==2021.3.3
4+
kubernetes==11.0.0
35
scipy==1.4.1; python_version > '3.0'
46
scipy==1.2.3; python_version < '3.0'
57
numpy==1.19.5; python_version > '3.0'

composer/workflows/constraints.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ apache-airflow-providers-elasticsearch==2.0.2
7171
apache-airflow-providers-exasol==2.0.0
7272
apache-airflow-providers-facebook==2.0.0
7373
apache-airflow-providers-ftp==2.0.0
74-
apache-airflow-providers-google==4.0.0
74+
apache-airflow-providers-google==5.1.0
7575
apache-airflow-providers-grpc==2.0.0
7676
apache-airflow-providers-hashicorp==2.0.0
7777
apache-airflow-providers-http==2.0.0

0 commit comments

Comments
 (0)
0