|
53 | 53 |
|
54 | 54 | import airflow
|
55 | 55 | from airflow import settings
|
56 |
| -from airflow.jobs.base_job import BaseJob |
57 | 56 | from airflow.models import (
|
58 | 57 | DAG,
|
59 | 58 | DagModel,
|
|
69 | 68 | from airflow.version import version as airflow_version
|
70 | 69 |
|
71 | 70 | import dateutil.parser
|
72 |
| -from sqlalchemy import and_, func |
| 71 | +from sqlalchemy import and_, func, text |
73 | 72 | from sqlalchemy.exc import ProgrammingError
|
74 | 73 | from sqlalchemy.orm import load_only
|
75 | 74 |
|
|
101 | 100 | # List of all the objects that will be deleted. Comment out the DB objects you
|
102 | 101 | # want to skip.
|
103 | 102 | DATABASE_OBJECTS = [
|
104 |
| - { |
105 |
| - "airflow_db_model": BaseJob, |
106 |
| - "age_check_column": BaseJob.latest_heartbeat, |
107 |
| - "keep_last": False, |
108 |
| - "keep_last_filters": None, |
109 |
| - "keep_last_group_by": None, |
110 |
| - }, |
111 | 103 | {
|
112 | 104 | "airflow_db_model": DagRun,
|
113 | 105 | "age_check_column": DagRun.execution_date,
|
|
228 | 220 | except Exception as e:
|
229 | 221 | logging.error(e)
|
230 | 222 |
|
| 223 | +if AIRFLOW_VERSION < ["2", "6", "0"]: |
| 224 | + try: |
| 225 | + from airflow.jobs.base_job import BaseJob |
| 226 | + DATABASE_OBJECTS.append( |
| 227 | + { |
| 228 | + "airflow_db_model": BaseJob, |
| 229 | + "age_check_column": BaseJob.latest_heartbeat, |
| 230 | + "keep_last": False, |
| 231 | + "keep_last_filters": None, |
| 232 | + "keep_last_group_by": None, |
| 233 | + } |
| 234 | + ) |
| 235 | + except Exception as e: |
| 236 | + logging.error(e) |
| 237 | +else: |
| 238 | + try: |
| 239 | + from airflow.jobs.job import Job |
| 240 | + DATABASE_OBJECTS.append( |
| 241 | + { |
| 242 | + "airflow_db_model": Job, |
| 243 | + "age_check_column": Job.latest_heartbeat, |
| 244 | + "keep_last": False, |
| 245 | + "keep_last_filters": None, |
| 246 | + "keep_last_group_by": None, |
| 247 | + } |
| 248 | + ) |
| 249 | + except Exception as e: |
| 250 | + logging.error(e) |
| 251 | + |
231 | 252 | default_args = {
|
232 | 253 | "owner": DAG_OWNER_NAME,
|
233 | 254 | "depends_on_past": False,
|
@@ -439,17 +460,41 @@ def cleanup_function(**context):
|
439 | 460 | finally:
|
440 | 461 | session.close()
|
441 | 462 |
|
| 463 | +def cleanup_sessions(): |
| 464 | + session = settings.Session() |
| 465 | + |
| 466 | + try: |
| 467 | + logging.info("Deleting sessions...") |
| 468 | + before = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all()) |
| 469 | + session.execute(text("DELETE FROM session WHERE expiry > now()::timestamp(0);")) |
| 470 | + after = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all()) |
| 471 | + logging.info("Deleted {} expired sessions.".format(before-after)) |
| 472 | + except Exception as e: |
| 473 | + logging.error(e) |
| 474 | + |
| 475 | + session.commit() |
| 476 | + session.close() |
442 | 477 |
|
443 | 478 | def analyze_db():
|
444 | 479 | session = settings.Session()
|
445 | 480 | session.execute("ANALYZE")
|
446 | 481 | session.commit()
|
| 482 | + session.close() |
447 | 483 |
|
448 | 484 |
|
449 | 485 | analyze_op = PythonOperator(
|
450 | 486 | task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag
|
451 | 487 | )
|
452 | 488 |
|
| 489 | +cleanup_session_op = PythonOperator( |
| 490 | + task_id="cleanup_sessions", |
| 491 | + python_callable=cleanup_sessions, |
| 492 | + provide_context=True, |
| 493 | + dag=dag |
| 494 | +) |
| 495 | + |
| 496 | +cleanup_session_op.set_downstream(analyze_op) |
| 497 | + |
453 | 498 | for db_object in DATABASE_OBJECTS:
|
454 | 499 | cleanup_op = PythonOperator(
|
455 | 500 | task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
|
|
0 commit comments