8000 Update airflow_db_cleanup.py to also clean sessions table · apilaskowski/python-docs-samples@dcfff64 · GitHub
[go: up one dir, main page]

Skip to content

Commit dcfff64

Browse files
authored
Update airflow_db_cleanup.py to also clean sessions table
1 parent 8ac4a9a commit dcfff64

File tree

1 file changed

+54
-9
lines changed

1 file changed

+54
-9
lines changed

composer/workflows/airflow_db_cleanup.py

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353

5454
import airflow
5555
from airflow import settings
56-
from airflow.jobs.base_job import BaseJob
5756
from airflow.models import (
5857
DAG,
5958
DagModel,
@@ -69,7 +68,7 @@
6968
from airflow.version import version as airflow_version
7069

7170
import dateutil.parser
72-
from sqlalchemy import and_, func
71+
from sqlalchemy import and_, func, text
7372
from sqlalchemy.exc import ProgrammingError
7473
from sqlalchemy.orm import load_only
7574

@@ -101,13 +100,6 @@
101100
# List of all the objects that will be deleted. Comment out the DB objects you
102101
# want to skip.
103102
DATABASE_OBJECTS = [
104-
{
105-
"airflow_db_model": BaseJob,
106-
"age_check_column": BaseJob.latest_heartbeat,
107-
"keep_last": False,
108-
"keep_last_filters": None,
109-
"keep_last_group_by": None,
110-
},
111103
{
112104
"airflow_db_model": DagRun,
113105
"age_check_column": DagRun.execution_date,
@@ -228,6 +220,35 @@
228220
except Exception as e:
229221
logging.error(e)
230222

223+
if AIRFLOW_VERSION < ["2", "6", "0"]:
224+
try:
225+
from airflow.jobs.base_job import BaseJob
226+
DATABASE_OBJECTS.append(
227+
{
228+
"airflow_db_model": BaseJob,
229+
"age_check_column": BaseJob.latest_heartbeat,
230+
"keep_last": False,
231+
"keep_last_filters": None,
232+
"keep_last_group_by": None,
233+
}
234+
)
235+
except Exception as e:
236+
logging.error(e)
237+
else:
238+
try:
239+
from airflow.jobs.job import Job
240+
DATABASE_OBJECTS.append(
241+
{
242+
"airflow_db_model": Job,
243+
"age_check_column": Job.latest_heartbeat,
244+
"keep_last": False,
245+
"keep_last_filters": None,
246+
"keep_last_group_by": None,
247+
}
248+
)
249+
except Exception as e:
250+
logging.error(e)
251+
231252
default_args = {
232253
"owner": DAG_OWNER_NAME,
233254
"depends_on_past": False,
@@ -439,17 +460,41 @@ def cleanup_function(**context):
439460
finally:
440461
session.close()
441462

463+
def cleanup_sessions():
464+
session = settings.Session()
465+
466+
try:
467+
logging.info("Deleting sessions...")
468+
before = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all())
469+
session.execute(text("DELETE FROM session WHERE expiry > now()::timestamp(0);"))
470+
after = len(session.execute(text("SELECT * FROM session WHERE expiry > now()::timestamp(0);")).mappings().all())
471+
logging.info("Deleted {} expired sessions.".format(before-after))
472+
except Exception as e:
473+
logging.error(e)
474+
475+
session.commit()
476+
session.close()
442477

443478
def analyze_db():
444479
session = settings.Session()
445480
session.execute("ANALYZE")
446481
session.commit()
482+
session.close()
447483

448484

449485
analyze_op = PythonOperator(
450486
task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag
451487
)
452488

489+
cleanup_session_op = PythonOperator(
490+
task_id="cleanup_sessions",
491+
python_callable=cleanup_sessions,
492+
provide_context=True,
493+
dag=dag
494+
)
495+
496+
cleanup_session_op.set_downstream(analyze_op)
497+
453498
for db_object in DATABASE_OBJECTS:
454499
cleanup_op = PythonOperator(
455500
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),

0 commit comments

Comments
 (0)
0