Skip to content

Fix the function provided to the ExternalTaskSensor

browser_metrics_daily DAG's ExternalTaskSensor accepts a function that generates a list of execution dates, which does not work at the moment.

This MR provides a more robust way of generating that function, along with appropriate unit test.

Unfortunately this can't be easily tested in a dev environment due to dependency on another DAG, but even so the task logs show a different behavior between the old version of the code and the new one:

New execution dates function produces the following log:

[2024-04-18, 20:40:18 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [queued]>
[2024-04-18, 20:40:18 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [queued]>
[2024-04-18, 20:40:18 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 6
[2024-04-18, 20:40:18 UTC] {taskinstance.py:1382} INFO - Executing <Task(ExternalTaskSensor): wait_for_browser_general_iceberg_table> on 2024-04-18 20:40:09.824358+00:00
[2024-04-18, 20:40:18 UTC] {standard_task_runner.py:57} INFO - Started process 21577 to run task
[2024-04-18, 20:40:18 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'browser_metrics_weekly', 'wait_for_browser_general_iceberg_table', 'manual__2024-04-18T20:40:09.824358+00:00', '--job-id', '4', '--raw', '--subdir', 'DAGS_FOLDER/report/browser/browser_metrics_weekly_dag.py', '--cfg-path', '/tmp/tmpwg_kgqda']
[2024-04-18, 20:40:18 UTC] {standard_task_runner.py:85} INFO - Job 4: Subtask wait_for_browser_general_iceberg_table
[2024-04-18, 20:40:18 UTC] {task_command.py:416} INFO - Running <TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [running]> on host stat1005.eqiad.wmnet
[2024-04-18, 20:40:18 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='data-engineering-alerts@lists.wikimedia.org' AIRFLOW_CTX_DAG_OWNER='analytics-privatedata' AIRFLOW_CTX_DAG_ID='browser_metrics_weekly' AIRFLOW_CTX_TASK_ID='wait_for_browser_general_iceberg_table' AIRFLOW_CTX_EXECUTION_DATE='2024-04-18T20:40:09.824358+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-18T20:40:09.824358+00:00'
[2024-04-18, 20:40:18 UTC] {external_task.py:244} INFO - Poking for tasks ['summarize_traffic_stats_iceberg'] in dag browser_general_daily on 2024-04-18T20:40:09.824358+00:00,2024-04-19T20:40:09.824358+00:00,2024-04-20T20:40:09.824358+00:00,2024-04-21T20:40:09.824358+00:00,2024-04-22T20:40:09.824358+00:00,2024-04-23T20:40:09.824358+00:00,2024-04-24T20:40:09.824358+00:00 ...
[2024-04-18, 20:40:18 UTC] {taskinstance.py:1897} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-18, 20:40:18 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2024-04-18, 20:40:19 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [queued]>
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [queued]>
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 6
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1382} INFO - Executing <Task(ExternalTaskSensor): wait_for_browser_general_iceberg_table> on 2024-04-18 20:40:09.824358+00:00
[2024-04-18, 20:57:52 UTC] {standard_task_runner.py:57} INFO - Started process 29187 to run task
[2024-04-18, 20:57:52 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'browser_metrics_weekly', 'wait_for_browser_general_iceberg_table', 'manual__2024-04-18T20:40:09.824358+00:00', '--job-id', '21', '--raw', '--subdir', 'DAGS_FOLDER/report/browser/browser_metrics_weekly_dag.py', '--cfg-path', '/tmp/tmp5peno5k2']
[2024-04-18, 20:57:52 UTC] {standard_task_runner.py:85} INFO - Job 21: Subtask wait_for_browser_general_iceberg_table
[2024-04-18, 20:57:52 UTC] {task_command.py:416} INFO - Running <TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table manual__2024-04-18T20:40:09.824358+00:00 [running]> on host stat1005.eqiad.wmnet
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='data-engineering-alerts@lists.wikimedia.org' AIRFLOW_CTX_DAG_OWNER='analytics-privatedata' AIRFLOW_CTX_DAG_ID='browser_metrics_weekly' AIRFLOW_CTX_TASK_ID='wait_for_browser_general_iceberg_table' AIRFLOW_CTX_EXECUTION_DATE='2024-04-18T20:40:09.824358+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-04-18T20:40:09.824358+00:00'
[2024-04-18, 20:57:52 UTC] {external_task.py:244} INFO - Poking for tasks ['summarize_traffic_stats_iceberg'] in dag browser_general_daily on 2024-04-18T20:40:09.824358+00:00,2024-04-19T20:40:09.824358+00:00,2024-04-20T20:40:09.824358+00:00,2024-04-21T20:40:09.824358+00:00,2024-04-22T20:40:09.824358+00:00,2024-04-23T20:40:09.824358+00:00,2024-04-24T20:40:09.824358+00:00 ...
[2024-04-18, 20:57:52 UTC] {taskinstance.py:1897} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2024-04-18, 20:57:52 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2024-04-18, 20:57:52 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

Old version produces the following log:

[2024-04-18, 08:06:15 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table scheduled__2024-04-07T00:00:00+00:00 [queued]> [2024-04-18, 08:06:15 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table scheduled__2024-04-07T00:00:00+00:00 [queued]> [2024-04-18, 08:06:15 UTC] {taskinstance.py:1361} INFO - Starting attempt 6 of 6 [2024-04-18, 08:06:15 UTC] {taskinstance.py:1382} INFO - Executing <Task(ExternalTaskSensor): wait_for_browser_general_iceberg_table> on 2024-04-07 00:00:00+00:00 [2024-04-18, 08:06:15 UTC] {standard_task_runner.py:57} INFO - Started process 23249 to run task [2024-04-18, 08:06:15 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'browser_metrics_weekly', 'wait_for_browser_general_iceberg_table', 'scheduled__2024-04-07T00:00:00+00:00', '--job-id', '4443129', '--raw', '--subdir', 'DAGS_FOLDER/report/browser/browser_metrics_weekly_dag.py', '--cfg-path', '/tmp/tmpmmoglxqd'] [2024-04-18, 08:06:15 UTC] {standard_task_runner.py:85} INFO - Job 4443129: Subtask wait_for_browser_general_iceberg_table [2024-04-18, 08:06:15 UTC] {task_command.py:416} INFO - Running <TaskInstance: browser_metrics_weekly.wait_for_browser_general_iceberg_table scheduled__2024-04-07T00:00:00+00:00 [running]> on host an-launcher1002.eqiad.wmnet [2024-04-18, 08:06:16 UTC] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='data-engineering-alerts@lists.wikimedia.org' AIRFLOW_CTX_DAG_OWNER='analytics' AIRFLOW_CTX_DAG_ID='browser_metrics_weekly' AIRFLOW_CTX_TASK_ID='wait_for_browser_general_iceberg_table' AIRFLOW_CTX_EXECUTION_DATE='2024-04-07T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='6' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-07T00:00:00+00:00' [2024-04-18, 10:05:26 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code -9 [2024-04-18, 10:05:26 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

Merge request reports