|
| 1 | +# Copyright 2021 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | + |
| 16 | +# [START composer_gkeoperator_airflow_1] |
| 17 | + |
| 18 | + |
| 19 | +from airflow import models |
| 20 | +from airflow.operators.bash_operator import BashOperator |
| 21 | +from airflow.providers.google.cloud.operators.kubernetes_engine import ( |
| 22 | + GKECreateClusterOperator, |
| 23 | + GKEDeleteClusterOperator, |
| 24 | + GKEStartPodOperator, |
| 25 | +) |
| 26 | +from airflow.utils.dates import days_ago |
| 27 | + |
| 28 | + |
| 29 | +with models.DAG( |
| 30 | + "example_gcp_gke", |
| 31 | + schedule_interval=None, # Override to match your needs |
| 32 | + start_date=days_ago(1), |
| 33 | + tags=["example"], |
| 34 | +) as dag: |
| 35 | + |
| 36 | + # [START composer_gke_create_cluster_airflow_1] |
| 37 | + # [START composer_gkeoperator_minconfig_airflow_1] |
| 38 | + # [START composer_gkeoperator_templateconfig_airflow_1] |
| 39 | + # [START composer_gkeoperator_affinity_airflow_1] |
| 40 | + # [START composer_gkeoperator_fullconfig_airflow_1] |
| 41 | + # TODO(developer): update with your values |
| 42 | + PROJECT_ID = "my-project-id" |
| 43 | + CLUSTER_ZONE = "us-west1-a" |
| 44 | + CLUSTER_NAME = "example-cluster" |
| 45 | + # [END composer_gkeoperator_minconfig_airflow_1] |
| 46 | + # [END composer_gkeoperator_templateconfig_airflow_1] |
| 47 | + # [END composer_gkeoperator_affinity_airflow_1] |
| 48 | + # [END composer_gkeoperator_fullconfig_airflow_1] |
| 49 | + CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1} |
| 50 | + # [END composer_gke_create_cluster_airflow_1] |
| 51 | + # [START composer_gke_create_cluster_airflow_1] |
| 52 | + create_cluster = GKECreateClusterOperator( |
| 53 | + task_id="create_cluster", |
| 54 | + project_id=PROJECT_ID, |
| 55 | + location=CLUSTER_ZONE, |
| 56 | + body=CLUSTER, |
| 57 | + ) |
| 58 | + # Using the BashOperator to create node pools is a workaround |
| 59 | + # In Airflow 2, because of https://github.com/apache/airflow/pull/17820 |
| 60 | + # Node pool creation can be done using the GKECreateClusterOperator |
| 61 | + |
| 62 | + create_node_pools = BashOperator( |
| 63 | + task_id="create_node_pools", |
| 64 | + bash_command=f"gcloud container node-pools create pool-0 \ |
| 65 | + --cluster {CLUSTER_NAME} \ |
| 66 | + --num-nodes 1 \ |
| 67 | + --zone {CLUSTER_ZONE} \ |
| 68 | + && gcloud container node-pools create pool-1 \ |
| 69 | + --cluster {CLUSTER_NAME} \ |
| 70 | + --num-nodes 1 \ |
| 71 | + --zone {CLUSTER_ZONE}", |
| 72 | + ) |
| 73 | + # [END composer_gke_create_cluster_airflow_1] |
| 74 | + |
| 75 | + # [START composer_gkeoperator_minconfig_airflow_1] |
| 76 | + kubernetes_min_pod = GKEStartPodOperator( |
| 77 | + # The ID specified for the task. |
| 78 | + task_id="pod-ex-minimum", |
| 79 | + # Name of task you want to run, used to generate Pod ID. |
| 80 | + name="pod-ex-minimum", |
| 81 | + project_id=PROJECT_ID, |
| 82 | + location=CLUSTER_ZONE, |
| 83 | + cluster_name=CLUSTER_NAME, |
| 84 | + # Entrypoint of the container, if not specified the Docker container's |
| 85 | + # entrypoint is used. The cmds parameter is templated. |
| 86 | + cmds=["echo"], |
| 87 | + # The namespace to run within Kubernetes, default namespace is |
| 88 | + # `default`. There is the potential for the resource starvation of |
| 89 | + # Airflow workers and scheduler within the Cloud Composer environment, |
| 90 | + # the recommended solution is to increase the amount of nodes in order |
| 91 | + # to satisfy the computing requirements. Alternatively, launching pods |
| 92 | + # into a custom namespace will stop fighting over resources. |
| 93 | + namespace="default", |
| 94 | + # Docker image specified. Defaults to hub.docker.com, but any fully |
| 95 | + # qualified URLs will point to a custom repository. Supports private |
| 96 | + # gcr.io images if the Composer Environment is under the same |
| 97 | + # project-id as the gcr.io images and the service account that Composer |
| 98 | + # uses has permission to access the Google Container Registry |
| 99 | + # (the default service account has permission) |
| 100 | + image="gcr.io/gcp-runtimes/ubuntu_18_0_4", |
| 101 | + ) |
| 102 | + # [END composer_gkeoperator_minconfig_airflow_1] |
| 103 | + |
| 104 | + # [START composer_gkeoperator_templateconfig_airflow_1] |
| 105 | + kubenetes_template_ex = GKEStartPodOperator( |
| 106 | + task_id="ex-kube-templates", |
| 107 | + name="ex-kube-templates", |
| 108 | + project_id=PROJECT_ID, |
| 109 | + location=CLUSTER_ZONE, |
| 110 | + cluster_name=CLUSTER_NAME, |
| 111 | + namespace="default", |
| 112 | + image="bash", |
| 113 | + # All parameters below are able to be templated with jinja -- cmds, |
| 114 | + # arguments, env_vars, and config_file. For more information visit: |
| 115 | + # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html |
| 116 | + # Entrypoint of the container, if not specified the Docker container's |
| 117 | + # entrypoint is used. The cmds parameter is templated. |
| 118 | + cmds=["echo"], |
| 119 | + # DS in jinja is the execution date as YYYY-MM-DD, this docker image |
| 120 | + # will echo the execution date. Arguments to the entrypoint. The docker |
| 121 | + # image's CMD is used if this is not provided. The arguments parameter |
| 122 | + # is templated. |
| 123 | + arguments=["{{ ds }}"], |
| 124 | + # The var template variable allows you to access variables defined in |
| 125 | + # Airflow UI. In this case we are getting the value of my_value and |
| 126 | + # setting the environment variable `MY_VALUE`. The pod will fail if |
| 127 | + # `my_value` is not set in the Airflow UI. |
| 128 | + env_vars={"MY_VALUE": "{{ var.value.my_value }}"}, |
| 129 | + ) |
| 130 | + # [END composer_gkeoperator_templateconfig_airflow_1] |
| 131 | + |
| 132 | + # [START composer_gkeoperator_affinity_airflow_1] |
| 133 | + kubernetes_affinity_ex = GKEStartPodOperator( |
| 134 | + task_id="ex-pod-affinity", |
| 135 | + project_id=PROJECT_ID, |
| 136 | + location=CLUSTER_ZONE, |
| 137 | + cluster_name=CLUSTER_NAME, |
| 138 | + name="ex-pod-affinity", |
| 139 | + namespace="default", |
| 140 | + image="perl", |
| 141 | + cmds=["perl"], |
| 142 | + arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"], |
| 143 | + # affinity allows you to constrain which nodes your pod is eligible to |
| 144 | + # be scheduled on, based on labels on the node. In this case, if the |
| 145 | + # label 'cloud.google.com/gke-nodepool' with value |
| 146 | + # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any |
| 147 | + # nodes, it will fail to schedule. |
| 148 | + affinity={ |
| 149 | + "nodeAffinity": { |
| 150 | + # requiredDuringSchedulingIgnoredDuringExecution means in order |
| 151 | + # for a pod to be scheduled on a node, the node must have the |
| 152 | + # specified labels. However, if labels on a node change at |
| 153 | + # runtime such that the affinity rules on a pod are no longer |
| 154 | + # met, the pod will still continue to run on the node. |
| 155 | + "requiredDuringSchedulingIgnoredDuringExecution": { |
| 156 | + "nodeSelectorTerms": [ |
| 157 | + { |
| 158 | + "matchExpressions": [ |
| 159 | + { |
| 160 | + # When nodepools are created in Google Kubernetes |
| 161 | + # Engine, the nodes inside of that nodepool are |
| 162 | + # automatically assigned the label |
| 163 | + # 'cloud.google.com/gke-nodepool' with the value of |
| 164 | + # the nodepool's name. |
| 165 | + "key": "cloud.google.com/gke-nodepool", |
| 166 | + "operator": "In", |
| 167 | + # The label key's value that pods can be scheduled |
| 168 | + # on. |
| 169 | + "values": [ |
| 170 | + "pool-1", |
| 171 | + ], |
| 172 | + } |
| 173 | + ] |
| 174 | + } |
| 175 | + ] |
| 176 | + } |
| 177 | + } |
| 178 | + }, |
| 179 | + ) |
| 180 | + # [END composer_gkeoperator_affinity_airflow_1] |
| 181 | + # [START composer_gkeoperator_fullconfig_airflow_1] |
| 182 | + kubernetes_full_pod = GKEStartPodOperator( |
| 183 | + task_id="ex-all-configs", |
| 184 | + name="full", |
| 185 | + project_id=PROJECT_ID, |
| 186 | + location=CLUSTER_ZONE, |
| 187 | + cluster_name=CLUSTER_NAME, |
| 188 | + namespace="default", |
| 189 | + image="perl", |
| 190 | + # Entrypoint of the container, if not specified the Docker container's |
| 191 | + # entrypoint is used. The cmds parameter is templated. |
| 192 | + cmds=["perl"], |
| 193 | + # Arguments to the entrypoint. The docker image's CMD is used if this |
| 194 | + # is not provided. The arguments parameter is templated. |
| 195 | + arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"], |
| 196 | + # The secrets to pass to Pod, the Pod will fail to create if the |
| 197 | + # secrets you specify in a Secret object do not exist in Kubernetes. |
| 198 | + secrets=[], |
| 199 | + # Labels to apply to the Pod. |
| 200 | + labels={"pod-label": "label-name"}, |
| 201 | + # Timeout to start up the Pod, default is 120. |
| 202 | + startup_timeout_seconds=120, |
| 203 | + # The environment variables to be initialized in the container |
| 204 | + # env_vars are templated. |
| 205 | + env_vars={"EXAMPLE_VAR": "/example/value"}, |
| 206 | + # If true, logs stdout output of container. Defaults to True. |
| 207 | + get_logs=True, |
| 208 | + # Determines when to pull a fresh image, if 'IfNotPresent' will cause |
| 209 | + # the Kubelet to skip pulling an image if it already exists. If you |
| 210 | + # want to always pull a new image, set it to 'Always'. |
| 211 | + image_pull_policy="Always", |
| 212 | + # Annotations are non-identifying metadata you can attach to the Pod. |
| 213 | + # Can be a large range of data, and can include characters that are not |
| 214 | + # permitted by labels. |
| 215 | + annotations={"key1": "value1"}, |
| 216 | + # Resource specifications for Pod, this will allow you to set both cpu |
| 217 | + # and memory limits and requirements. |
| 218 | + # Prior to Airflow 1.10.4, resource specifications were |
| 219 | + # passed as a Pod Resources Class object, |
| 220 | + # If using this example on a version of Airflow prior to 1.10.4, |
| 221 | + # import the "pod" package from airflow.contrib.kubernetes and use |
| 222 | + # resources = pod.Resources() instead passing a dict |
| 223 | + # For more info see: |
| 224 | + # https://github.com/apache/airflow/pull/4551 |
| 225 | + resources={"limit_memory": "250M", "limit_cpu": "100m"}, |
| 226 | + # If true, the content of /airflow/xcom/return.json from container will |
| 227 | + # also be pushed to an XCom when the container ends. |
| 228 | + do_xcom_push=False, |
| 229 | + # List of Volume objects to pass to the Pod. |
| 230 | + volumes=[], |
| 231 | + # List of VolumeMount objects to pass to the Pod. |
| 232 | + volume_mounts=[], |
| 233 | + # Affinity determines which nodes the Pod can run on based on the |
| 234 | + # config. For more information see: |
| 235 | + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ |
| 236 | + affinity={}, |
| 237 | + ) |
| 238 | + # [END composer_gkeoperator_fullconfig_airflow_1] |
| 239 | + # [START composer_gkeoperator_delete_cluster_airflow_1] |
| 240 | + delete_cluster = GKEDeleteClusterOperator( |
| 241 | + task_id="delete_cluster", |
| 242 | + name=CLUSTER_NAME, |
| 243 | + project_id=PROJECT_ID, |
| 244 | + location=CLUSTER_ZONE, |
| 245 | + ) |
| 246 | + # [END composer_gkeoperator_delete_cluster_airflow_1] |
| 247 | + |
| 248 | + create_cluster >> create_node_pools >> kubernetes_min_pod >> delete_cluster |
| 249 | + create_cluster >> create_node_pools >> kubernetes_full_pod >> delete_cluster |
| 250 | + create_cluster >> create_node_pools >> kubernetes_affinity_ex >> delete_cluster |
| 251 | + create_cluster >> create_node_pools >> kubenetes_template_ex >> delete_cluster |
| 252 | + |
| 253 | +# [END composer_gkeoperator_airflow_1] |
0 commit comments