From a8aa1a79276fc68cccf65356ff970571f9e920f4 Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Wed, 14 Sep 2022 23:19:56 +0200 Subject: [PATCH 1/6] Add unique devices DAGs --- .../unique_devices_per_domain_daily_dag.py | 113 +++++++++++++++++ .../unique_devices_per_domain_monthly_dag.py | 112 +++++++++++++++++ ...ue_devices_per_project_family_daily_dag.py | 114 ++++++++++++++++++ ..._devices_per_project_family_monthly_dag.py | 113 +++++++++++++++++ 4 files changed, 452 insertions(+) create mode 100644 analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py create mode 100644 analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py create mode 100644 analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py create mode 100644 analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py diff --git a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py new file mode 100644 index 0000000..61c6c9d --- /dev/null +++ b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py @@ -0,0 +1,113 @@ +''' +Calculates metrics about unique devices per domain on a daily basis. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_per_domain_daily' +var_props = VariableProperties(f'{dag_id}_config') + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), + schedule_interval='@daily', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(hours=6), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # The unique_devices_per_domain_daily table to use as intermediate destination. + intermediate_table = var_props.get('intermediate_table', + 'wmf.unique_devices_per_domain_daily' + ) + + # A temporary directory to store gzipped files. + temporary_directory = var_props.get('temporary_directory', + f'{hdfs_temp_directory}/{dag_id}/{{{{ds}}}}' + ) + + # The final destination path for the archived dumps. + destination_path = var_props.get('destination_path', + archive_directory + '/unique_devices/per_domain/{{execution_date.year}}' + + '/{{execution_date|to_ds_month}}/unique_devices_per_domain_daily-{{ds}}.gz' + ) + + # Sensor to wait for the pageview actor partitions. + sensor = NamedHivePartitionSensor( + task_id='wait_for_pageview_actor', + partition_names=daily_partitions( + table=source_table, + granularity='@hourly', + ), + poke_interval=timedelta(minutes=20).total_seconds(), + ) + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id='compute_uniques_metrics', + sql=var_props.get('compute_hql_path', + f'{hql_directory}/unique_devices/per_domain/unique_devices_per_domain_daily.hql' + ), + query_parameters={ + 'source_table': source_table, + 'destination_table': intermediate_table, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=2, + executor_memory='8G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id='dump_uniques_metrics', + sql=var_props.get('dump_hql_path', + f'{hql_directory}/unique_devices/per_domain' + + '/unique_devices_per_domain_daily_to_archive.hql' + ), + query_parameters={ + 'source_table': intermediate_table, + 'destination_directory': temporary_directory, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id='move_data_to_archive', + source_directory=temporary_directory, + check_done=True, + archive_file=destination_path, + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py new file mode 100644 index 0000000..dec7cd8 --- /dev/null +++ b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py @@ -0,0 +1,112 @@ +''' +Calculates metrics about unique devices per domain on a monthly basis. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_per_domain_monthly' +var_props = VariableProperties(f'{dag_id}_config') + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), + schedule_interval='@monthly', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(days=1), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # The unique_devices_per_domain_monthly table to use as intermediate destination. + intermediate_table = var_props.get('intermediate_table', + 'wmf.unique_devices_per_domain_monthly' + ) + + # A temporary directory to store gzipped files. + temporary_directory = var_props.get('temporary_directory', + f'{hdfs_temp_directory}/{dag_id}/{{{{execution_date|to_ds_month}}}}' + ) + + # The final destination path for the archived dumps. + destination_path = var_props.get('destination_path', + archive_directory + '/unique_devices/per_domain' + + '/{{execution_date.year}}/{{execution_date|to_ds_month}}' + + '/unique_devices_per_domain_monthly-{{execution_date|to_ds_month}}.gz' + ) + + # Sensor to wait for the pageview actor partitions. + sensor = RangeHivePartitionSensor( + task_id='wait_for_pageview_actor', + table_name=source_table, + from_timestamp='{{execution_date|start_of_current_month}}', + to_timestamp='{{execution_date|start_of_next_month}}', + granularity='@hourly', + poke_interval=timedelta(hours=1).total_seconds(), + ) + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id='compute_uniques_metrics', + sql=var_props.get('compute_hql_path', + f'{hql_directory}/unique_devices/per_domain/unique_devices_per_domain_monthly.hql' + ), + query_parameters={ + 'source_table': source_table, + 'destination_table': intermediate_table, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=4, + executor_memory='16G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id='dump_uniques_metrics', + sql=var_props.get('dump_hql_path', + f'{hql_directory}/unique_devices/per_domain' + + '/unique_devices_per_domain_monthly_to_archive.hql' + ), + query_parameters={ + 'source_table': intermediate_table, + 'destination_directory': temporary_directory, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id='move_data_to_archive', + source_directory=temporary_directory, + check_done=True, + archive_file=destination_path, + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py new file mode 100644 index 0000000..90b0c15 --- /dev/null +++ b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py @@ -0,0 +1,114 @@ +''' +Calculates metrics about unique devices per project family on a daily basis. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_per_project_family_daily' +var_props = VariableProperties(f'{dag_id}_config') + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), + schedule_interval='@daily', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(hours=6), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # The unique_devices_per_domain_daily table to use as intermediate destination. + intermediate_table = var_props.get('intermediate_table', + 'wmf.unique_devices_per_project_family_daily' + ) + + # A temporary directory to store gzipped files. + temporary_directory = var_props.get('temporary_directory', + f'{hdfs_temp_directory}/{dag_id}/{{{{ds}}}}' + ) + + # The final destination path for the archived dumps. + destination_path = var_props.get('destination_path', + archive_directory + '/unique_devices/per_project_family/{{execution_date.year}}' + + '/{{execution_date|to_ds_month}}/unique_devices_per_project_family_daily-{{ds}}.gz' + ) + + # Sensor to wait for the pageview actor partitions. + sensor = NamedHivePartitionSensor( + task_id='wait_for_pageview_actor', + partition_names=daily_partitions( + table=source_table, + granularity='@hourly', + ), + poke_interval=timedelta(minutes=20).total_seconds(), + ) + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id='compute_uniques_metrics', + sql=var_props.get('compute_hql_path', + f'{hql_directory}/unique_devices/per_project_family' + + '/unique_devices_per_project_family_daily.hql' + ), + query_parameters={ + 'source_table': source_table, + 'destination_table': intermediate_table, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=2, + executor_memory='8G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id='dump_uniques_metrics', + sql=var_props.get('dump_hql_path', + f'{hql_directory}/unique_devices/per_project_family' + + '/unique_devices_per_project_family_daily_to_archive.hql' + ), + query_parameters={ + 'source_table': intermediate_table, + 'destination_directory': temporary_directory, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id='move_data_to_archive', + source_directory=temporary_directory, + check_done=True, + archive_file=destination_path, + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py new file mode 100644 index 0000000..5121708 --- /dev/null +++ b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py @@ -0,0 +1,113 @@ +''' +Calculates metrics about unique devices per project family on a monthly basis. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_per_project_family_monthly' +var_props = VariableProperties(f'{dag_id}_config') + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), + schedule_interval='@monthly', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(days=1), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # The unique_devices_per_project_family_monthly table to use as intermediate destination. + intermediate_table = var_props.get('intermediate_table', + 'wmf.unique_devices_per_project_family_monthly' + ) + + # A temporary directory to store gzipped files. + temporary_directory = var_props.get('temporary_directory', + f'{hdfs_temp_directory}/{dag_id}/{{{{execution_date|to_ds_month}}}}' + ) + + # The final destination path for the archived dumps. + destination_path = var_props.get('destination_path', + archive_directory + '/unique_devices/per_project_family' + + '/{{execution_date.year}}/{{execution_date|to_ds_month}}' + + '/unique_devices_per_project_family_monthly-{{execution_date|to_ds_month}}.gz' + ) + + # Sensor to wait for the pageview actor partitions. + sensor = RangeHivePartitionSensor( + task_id='wait_for_pageview_actor', + table_name=source_table, + from_timestamp='{{execution_date|start_of_current_month}}', + to_timestamp='{{execution_date|start_of_next_month}}', + granularity='@hourly', + poke_interval=timedelta(hours=1).total_seconds(), + ) + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id='compute_uniques_metrics', + sql=var_props.get('compute_hql_path', + f'{hql_directory}/unique_devices/per_project_family' + + '/unique_devices_per_project_family_monthly.hql' + ), + query_parameters={ + 'source_table': source_table, + 'destination_table': intermediate_table, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=4, + executor_memory='16G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id='dump_uniques_metrics', + sql=var_props.get('dump_hql_path', + f'{hql_directory}/unique_devices/per_project_family' + + '/unique_devices_per_project_family_monthly_to_archive.hql' + ), + query_parameters={ + 'source_table': intermediate_table, + 'destination_directory': temporary_directory, + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id='move_data_to_archive', + source_directory=temporary_directory, + check_done=True, + archive_file=destination_path, + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive -- GitLab From 5aa22cead34c7fbff7a8d424f6e351e71966d614 Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Fri, 16 Sep 2022 20:52:22 +0200 Subject: [PATCH 2/6] add unit tests --- .../unique_devices_per_domain_daily_dag_test.py | 12 ++++++++++++ .../unique_devices_per_domain_monthly_dag_test.py | 12 ++++++++++++ ...ique_devices_per_project_family_daily_dag_test.py | 12 ++++++++++++ ...ue_devices_per_project_family_monthly_dag_test.py | 12 ++++++++++++ 4 files changed, 48 insertions(+) create mode 100644 tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py create mode 100644 tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py create mode 100644 tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py create mode 100644 tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py diff --git a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py new file mode 100644 index 0000000..c2373b7 --- /dev/null +++ b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py @@ -0,0 +1,12 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'per_domain', + 'unique_devices_per_domain_daily_dag.py'] + +def test_unique_devices_per_domain_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_per_domain_daily") + assert dag is not None + assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py new file mode 100644 index 0000000..7ab42f1 --- /dev/null +++ b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py @@ -0,0 +1,12 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'per_domain', + 'unique_devices_per_domain_monthly_dag.py'] + +def test_unique_devices_per_domain_monthly_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_per_domain_monthly") + assert dag is not None + assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py new file mode 100644 index 0000000..3deb84a --- /dev/null +++ b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py @@ -0,0 +1,12 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'per_project_family', + 'unique_devices_per_project_family_daily_dag.py'] + +def test_unique_devices_per_project_family_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_per_project_family_daily") + assert dag is not None + assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py new file mode 100644 index 0000000..69d1237 --- /dev/null +++ b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py @@ -0,0 +1,12 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'per_project_family', + 'unique_devices_per_project_family_monthly_dag.py'] + +def test_unique_devices_per_project_family_monthly_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_per_project_family_monthly") + assert dag is not None + assert len(dag.tasks) == 4 -- GitLab From 753d6e824a93aa026aa0b3d726107ce3d82c92dc Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Thu, 22 Sep 2022 23:02:10 +0200 Subject: [PATCH 3/6] merge DAGs by granularity --- .../unique_devices_per_domain_daily_dag.py | 113 -------------- .../unique_devices_per_domain_monthly_dag.py | 112 -------------- ...ue_devices_per_project_family_daily_dag.py | 114 -------------- ..._devices_per_project_family_monthly_dag.py | 113 -------------- .../unique_devices_daily_dag.py | 136 +++++++++++++++++ .../unique_devices_monthly_dag.py | 139 ++++++++++++++++++ 6 files changed, 275 insertions(+), 452 deletions(-) delete mode 100644 analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py delete mode 100644 analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py delete mode 100644 analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py delete mode 100644 analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py create mode 100644 analytics/dags/unique_devices/unique_devices_daily_dag.py create mode 100644 analytics/dags/unique_devices/unique_devices_monthly_dag.py diff --git a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py deleted file mode 100644 index 61c6c9d..0000000 --- a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_daily_dag.py +++ /dev/null @@ -1,113 +0,0 @@ -''' -Calculates metrics about unique devices per domain on a daily basis. -The metrics are stored in a Hive table and also as CSV archive files. -''' - -from airflow import DAG -from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor -from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) -from datetime import datetime, timedelta -from wmf_airflow_common.config.variable_properties import VariableProperties -from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator -from wmf_airflow_common.operators.spark import SparkSqlOperator -from wmf_airflow_common.partitions_builder import daily_partitions -from wmf_airflow_common.templates.time_filters import filters - -dag_id = 'unique_devices_per_domain_daily' -var_props = VariableProperties(f'{dag_id}_config') - -with DAG( - dag_id=dag_id, - doc_md=__doc__, - start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), - schedule_interval='@daily', - tags=['unique_devices', 'archive'], - user_defined_filters=filters, - default_args=var_props.get_merged('default_args', { - **spark3_default_args, - 'sla': timedelta(hours=6), - }), -) as dag: - - # The pageview actor table to use as a source. - source_table = 'wmf.pageview_actor' - - # The unique_devices_per_domain_daily table to use as intermediate destination. - intermediate_table = var_props.get('intermediate_table', - 'wmf.unique_devices_per_domain_daily' - ) - - # A temporary directory to store gzipped files. - temporary_directory = var_props.get('temporary_directory', - f'{hdfs_temp_directory}/{dag_id}/{{{{ds}}}}' - ) - - # The final destination path for the archived dumps. - destination_path = var_props.get('destination_path', - archive_directory + '/unique_devices/per_domain/{{execution_date.year}}' + - '/{{execution_date|to_ds_month}}/unique_devices_per_domain_daily-{{ds}}.gz' - ) - - # Sensor to wait for the pageview actor partitions. - sensor = NamedHivePartitionSensor( - task_id='wait_for_pageview_actor', - partition_names=daily_partitions( - table=source_table, - granularity='@hourly', - ), - poke_interval=timedelta(minutes=20).total_seconds(), - ) - - # SQL operator for the computation of the metrics. - compute = SparkSqlOperator( - task_id='compute_uniques_metrics', - sql=var_props.get('compute_hql_path', - f'{hql_directory}/unique_devices/per_domain/unique_devices_per_domain_daily.hql' - ), - query_parameters={ - 'source_table': source_table, - 'destination_table': intermediate_table, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'day': '{{execution_date.day}}', - 'coalesce_partitions': 1, - }, - driver_cores=1, - driver_memory='4G', - executor_cores=2, - executor_memory='8G', - conf={ - 'spark.dynamicAllocation.maxExecutors': 32, - 'spark.yarn.executor.memoryOverhead': 2048, - 'spark.sql.shuffle.partitions': 512, - }, - ) - - # SQL operator for the dump of the metrics. - dump = SparkSqlOperator( - task_id='dump_uniques_metrics', - sql=var_props.get('dump_hql_path', - f'{hql_directory}/unique_devices/per_domain' + - '/unique_devices_per_domain_daily_to_archive.hql' - ), - query_parameters={ - 'source_table': intermediate_table, - 'destination_directory': temporary_directory, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'day': '{{execution_date.day}}', - }, - ) - - # Archive operator to cleanly move the dump to its final destination. - archive = HDFSArchiveOperator( - task_id='move_data_to_archive', - source_directory=temporary_directory, - check_done=True, - archive_file=destination_path, - archive_parent_umask='027', - archive_perms='640', - ) - - sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py b/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py deleted file mode 100644 index dec7cd8..0000000 --- a/analytics/dags/unique_devices/per_domain/unique_devices_per_domain_monthly_dag.py +++ /dev/null @@ -1,112 +0,0 @@ -''' -Calculates metrics about unique devices per domain on a monthly basis. -The metrics are stored in a Hive table and also as CSV archive files. -''' - -from airflow import DAG -from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) -from datetime import datetime, timedelta -from wmf_airflow_common.config.variable_properties import VariableProperties -from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator -from wmf_airflow_common.operators.spark import SparkSqlOperator -from wmf_airflow_common.partitions_builder import daily_partitions -from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor -from wmf_airflow_common.templates.time_filters import filters - -dag_id = 'unique_devices_per_domain_monthly' -var_props = VariableProperties(f'{dag_id}_config') - -with DAG( - dag_id=dag_id, - doc_md=__doc__, - start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), - schedule_interval='@monthly', - tags=['unique_devices', 'archive'], - user_defined_filters=filters, - default_args=var_props.get_merged('default_args', { - **spark3_default_args, - 'sla': timedelta(days=1), - }), -) as dag: - - # The pageview actor table to use as a source. - source_table = 'wmf.pageview_actor' - - # The unique_devices_per_domain_monthly table to use as intermediate destination. - intermediate_table = var_props.get('intermediate_table', - 'wmf.unique_devices_per_domain_monthly' - ) - - # A temporary directory to store gzipped files. - temporary_directory = var_props.get('temporary_directory', - f'{hdfs_temp_directory}/{dag_id}/{{{{execution_date|to_ds_month}}}}' - ) - - # The final destination path for the archived dumps. - destination_path = var_props.get('destination_path', - archive_directory + '/unique_devices/per_domain' + - '/{{execution_date.year}}/{{execution_date|to_ds_month}}' + - '/unique_devices_per_domain_monthly-{{execution_date|to_ds_month}}.gz' - ) - - # Sensor to wait for the pageview actor partitions. - sensor = RangeHivePartitionSensor( - task_id='wait_for_pageview_actor', - table_name=source_table, - from_timestamp='{{execution_date|start_of_current_month}}', - to_timestamp='{{execution_date|start_of_next_month}}', - granularity='@hourly', - poke_interval=timedelta(hours=1).total_seconds(), - ) - - # SQL operator for the computation of the metrics. - compute = SparkSqlOperator( - task_id='compute_uniques_metrics', - sql=var_props.get('compute_hql_path', - f'{hql_directory}/unique_devices/per_domain/unique_devices_per_domain_monthly.hql' - ), - query_parameters={ - 'source_table': source_table, - 'destination_table': intermediate_table, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'coalesce_partitions': 1, - }, - driver_cores=1, - driver_memory='4G', - executor_cores=4, - executor_memory='16G', - conf={ - 'spark.dynamicAllocation.maxExecutors': 32, - 'spark.yarn.executor.memoryOverhead': 2048, - 'spark.sql.shuffle.partitions': 512, - }, - ) - - # SQL operator for the dump of the metrics. - dump = SparkSqlOperator( - task_id='dump_uniques_metrics', - sql=var_props.get('dump_hql_path', - f'{hql_directory}/unique_devices/per_domain' + - '/unique_devices_per_domain_monthly_to_archive.hql' - ), - query_parameters={ - 'source_table': intermediate_table, - 'destination_directory': temporary_directory, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - }, - ) - - # Archive operator to cleanly move the dump to its final destination. - archive = HDFSArchiveOperator( - task_id='move_data_to_archive', - source_directory=temporary_directory, - check_done=True, - archive_file=destination_path, - archive_parent_umask='027', - archive_perms='640', - ) - - sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py deleted file mode 100644 index 90b0c15..0000000 --- a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag.py +++ /dev/null @@ -1,114 +0,0 @@ -''' -Calculates metrics about unique devices per project family on a daily basis. -The metrics are stored in a Hive table and also as CSV archive files. -''' - -from airflow import DAG -from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor -from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) -from datetime import datetime, timedelta -from wmf_airflow_common.config.variable_properties import VariableProperties -from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator -from wmf_airflow_common.operators.spark import SparkSqlOperator -from wmf_airflow_common.partitions_builder import daily_partitions -from wmf_airflow_common.templates.time_filters import filters - -dag_id = 'unique_devices_per_project_family_daily' -var_props = VariableProperties(f'{dag_id}_config') - -with DAG( - dag_id=dag_id, - doc_md=__doc__, - start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), - schedule_interval='@daily', - tags=['unique_devices', 'archive'], - user_defined_filters=filters, - default_args=var_props.get_merged('default_args', { - **spark3_default_args, - 'sla': timedelta(hours=6), - }), -) as dag: - - # The pageview actor table to use as a source. - source_table = 'wmf.pageview_actor' - - # The unique_devices_per_domain_daily table to use as intermediate destination. - intermediate_table = var_props.get('intermediate_table', - 'wmf.unique_devices_per_project_family_daily' - ) - - # A temporary directory to store gzipped files. - temporary_directory = var_props.get('temporary_directory', - f'{hdfs_temp_directory}/{dag_id}/{{{{ds}}}}' - ) - - # The final destination path for the archived dumps. - destination_path = var_props.get('destination_path', - archive_directory + '/unique_devices/per_project_family/{{execution_date.year}}' + - '/{{execution_date|to_ds_month}}/unique_devices_per_project_family_daily-{{ds}}.gz' - ) - - # Sensor to wait for the pageview actor partitions. - sensor = NamedHivePartitionSensor( - task_id='wait_for_pageview_actor', - partition_names=daily_partitions( - table=source_table, - granularity='@hourly', - ), - poke_interval=timedelta(minutes=20).total_seconds(), - ) - - # SQL operator for the computation of the metrics. - compute = SparkSqlOperator( - task_id='compute_uniques_metrics', - sql=var_props.get('compute_hql_path', - f'{hql_directory}/unique_devices/per_project_family' + - '/unique_devices_per_project_family_daily.hql' - ), - query_parameters={ - 'source_table': source_table, - 'destination_table': intermediate_table, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'day': '{{execution_date.day}}', - 'coalesce_partitions': 1, - }, - driver_cores=1, - driver_memory='4G', - executor_cores=2, - executor_memory='8G', - conf={ - 'spark.dynamicAllocation.maxExecutors': 32, - 'spark.yarn.executor.memoryOverhead': 2048, - 'spark.sql.shuffle.partitions': 512, - }, - ) - - # SQL operator for the dump of the metrics. - dump = SparkSqlOperator( - task_id='dump_uniques_metrics', - sql=var_props.get('dump_hql_path', - f'{hql_directory}/unique_devices/per_project_family' + - '/unique_devices_per_project_family_daily_to_archive.hql' - ), - query_parameters={ - 'source_table': intermediate_table, - 'destination_directory': temporary_directory, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'day': '{{execution_date.day}}', - }, - ) - - # Archive operator to cleanly move the dump to its final destination. - archive = HDFSArchiveOperator( - task_id='move_data_to_archive', - source_directory=temporary_directory, - check_done=True, - archive_file=destination_path, - archive_parent_umask='027', - archive_perms='640', - ) - - sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py b/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py deleted file mode 100644 index 5121708..0000000 --- a/analytics/dags/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag.py +++ /dev/null @@ -1,113 +0,0 @@ -''' -Calculates metrics about unique devices per project family on a monthly basis. -The metrics are stored in a Hive table and also as CSV archive files. -''' - -from airflow import DAG -from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) -from datetime import datetime, timedelta -from wmf_airflow_common.config.variable_properties import VariableProperties -from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator -from wmf_airflow_common.operators.spark import SparkSqlOperator -from wmf_airflow_common.partitions_builder import daily_partitions -from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor -from wmf_airflow_common.templates.time_filters import filters - -dag_id = 'unique_devices_per_project_family_monthly' -var_props = VariableProperties(f'{dag_id}_config') - -with DAG( - dag_id=dag_id, - doc_md=__doc__, - start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), - schedule_interval='@monthly', - tags=['unique_devices', 'archive'], - user_defined_filters=filters, - default_args=var_props.get_merged('default_args', { - **spark3_default_args, - 'sla': timedelta(days=1), - }), -) as dag: - - # The pageview actor table to use as a source. - source_table = 'wmf.pageview_actor' - - # The unique_devices_per_project_family_monthly table to use as intermediate destination. - intermediate_table = var_props.get('intermediate_table', - 'wmf.unique_devices_per_project_family_monthly' - ) - - # A temporary directory to store gzipped files. - temporary_directory = var_props.get('temporary_directory', - f'{hdfs_temp_directory}/{dag_id}/{{{{execution_date|to_ds_month}}}}' - ) - - # The final destination path for the archived dumps. - destination_path = var_props.get('destination_path', - archive_directory + '/unique_devices/per_project_family' + - '/{{execution_date.year}}/{{execution_date|to_ds_month}}' + - '/unique_devices_per_project_family_monthly-{{execution_date|to_ds_month}}.gz' - ) - - # Sensor to wait for the pageview actor partitions. - sensor = RangeHivePartitionSensor( - task_id='wait_for_pageview_actor', - table_name=source_table, - from_timestamp='{{execution_date|start_of_current_month}}', - to_timestamp='{{execution_date|start_of_next_month}}', - granularity='@hourly', - poke_interval=timedelta(hours=1).total_seconds(), - ) - - # SQL operator for the computation of the metrics. - compute = SparkSqlOperator( - task_id='compute_uniques_metrics', - sql=var_props.get('compute_hql_path', - f'{hql_directory}/unique_devices/per_project_family' + - '/unique_devices_per_project_family_monthly.hql' - ), - query_parameters={ - 'source_table': source_table, - 'destination_table': intermediate_table, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - 'coalesce_partitions': 1, - }, - driver_cores=1, - driver_memory='4G', - executor_cores=4, - executor_memory='16G', - conf={ - 'spark.dynamicAllocation.maxExecutors': 32, - 'spark.yarn.executor.memoryOverhead': 2048, - 'spark.sql.shuffle.partitions': 512, - }, - ) - - # SQL operator for the dump of the metrics. - dump = SparkSqlOperator( - task_id='dump_uniques_metrics', - sql=var_props.get('dump_hql_path', - f'{hql_directory}/unique_devices/per_project_family' + - '/unique_devices_per_project_family_monthly_to_archive.hql' - ), - query_parameters={ - 'source_table': intermediate_table, - 'destination_directory': temporary_directory, - 'year': '{{execution_date.year}}', - 'month': '{{execution_date.month}}', - }, - ) - - # Archive operator to cleanly move the dump to its final destination. - archive = HDFSArchiveOperator( - task_id='move_data_to_archive', - source_directory=temporary_directory, - check_done=True, - archive_file=destination_path, - archive_parent_umask='027', - archive_perms='640', - ) - - sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/unique_devices_daily_dag.py b/analytics/dags/unique_devices/unique_devices_daily_dag.py new file mode 100644 index 0000000..c44003e --- /dev/null +++ b/analytics/dags/unique_devices/unique_devices_daily_dag.py @@ -0,0 +1,136 @@ +''' +Calculates metrics about unique devices on a daily basis. +Two sets of metrics are generated: per domain and per project family. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_daily' +var_props = VariableProperties(f'{dag_id}_config') + +# Contains the properties of each of the datasets produced by this DAG. +datasets = var_props.get('datasets', { + 'per_domain': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_daily.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_daily_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_domain_daily', + 'temporary_directory': f'{hdfs_temp_directory}/{dag_id}/per_domain/{{{{ds}}}}', + 'destination_path': ( + f'{archive_directory}/unique_devices/per_domain/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_domain_daily-{{ds}}.gz' + ), + }, + 'per_project_family': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_daily.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_daily_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_project_family_daily', + 'temporary_directory': f'{hdfs_temp_directory}/{dag_id}/per_project_family/{{{{ds}}}}', + 'destination_path': ( + f'{archive_directory}/unique_devices/per_project_family/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_project_family_daily-{{ds}}.gz' + ), + }, +}) + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), + schedule_interval='@daily', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(hours=6), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # Sensor to wait for the pageview actor partitions. + sensor = NamedHivePartitionSensor( + task_id='wait_for_pageview_actor', + partition_names=daily_partitions( + table=source_table, + granularity='@hourly', + ), + poke_interval=timedelta(minutes=20).total_seconds(), + ) + + # From here on, the DAG splits in 2 analogous subdags: one for + # uniques per domain and the other for uniques per project family. + for dataset_name, dataset_props in datasets.items(): + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id=f'compute_{dataset_name}_metrics', + sql=dataset_props['compute_hql_path'], + query_parameters={ + 'source_table': source_table, + 'destination_table': dataset_props['intermediate_table'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=2, + executor_memory='8G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id=f'dump_{dataset_name}_metrics', + sql=dataset_props['dump_hql_path'], + query_parameters={ + 'source_table': dataset_props['intermediate_table'], + 'destination_directory': dataset_props['temporary_directory'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id=f'move_{dataset_name}_data_to_archive', + refinery_job_shaded_jar_local_path=artifact(''), + source_directory=dataset_props['temporary_directory'], + check_done=True, + archive_file=dataset_props['destination_path'], + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/unique_devices_monthly_dag.py b/analytics/dags/unique_devices/unique_devices_monthly_dag.py new file mode 100644 index 0000000..218d027 --- /dev/null +++ b/analytics/dags/unique_devices/unique_devices_monthly_dag.py @@ -0,0 +1,139 @@ +''' +Calculates metrics about unique devices on a monthly basis. +Two sets of metrics are generated: per domain and per project family. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_monthly' +var_props = VariableProperties(f'{dag_id}_config') + +# Contains the properties of each of the datasets produced by this DAG. +datasets = var_props.get('datasets', { + 'per_domain': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_monthly.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_monthly_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_domain_monthly', + 'temporary_directory': ( + f'{hdfs_temp_directory}/{dag_id}/per_domain/' + + '{{execution_date|to_ds_month}}' + ), + 'destination_path': ( + f'{archive_directory}/unique_devices/per_domain/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_domain_monthly-{{execution_date|to_ds_month}}.gz' + ), + }, + 'per_project_family': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_monthly.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_monthly_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_project_family_monthly', + 'temporary_directory': ( + f'{hdfs_temp_directory}/{dag_id}/per_project_family/' + + '{{execution_date|to_ds_month}}' + ), + 'destination_path': ( + f'{archive_directory}/unique_devices/per_project_family/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_project_family_monthly-{{execution_date|to_ds_month}}.gz' + ), + }, +}) + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), + schedule_interval='@monthly', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **spark3_default_args, + 'sla': timedelta(days=1), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # Sensor to wait for the pageview actor partitions. + sensor = RangeHivePartitionSensor( + task_id='wait_for_pageview_actor', + table_name=source_table, + from_timestamp='{{execution_date|start_of_current_month}}', + to_timestamp='{{execution_date|start_of_next_month}}', + granularity='@hourly', + poke_interval=timedelta(hours=1).total_seconds(), + ) + + # From here on, the DAG splits in 2 analogous subdags: one for + # uniques per domain and the other for uniques per project family. + for dataset_name, dataset_props in datasets.items(): + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id=f'compute_{dataset_name}_metrics', + sql=dataset_props['compute_hql_path'], + query_parameters={ + 'source_table': source_table, + 'destination_table': dataset_props['intermediate_table'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=4, + executor_memory='16G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id=f'dump_{dataset_name}_metrics', + sql=dataset_props['dump_hql_path'], + query_parameters={ + 'source_table': dataset_props['intermediate_table'], + 'destination_directory': dataset_props['temporary_directory'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id=f'move_{dataset_name}_data_to_archive', + source_directory=dataset_props['temporary_directory'], + check_done=True, + archive_file=dataset_props['destination_path'], + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive -- GitLab From 3d6b84e7740aa5cba47c7179131c3381a613582d Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Thu, 22 Sep 2022 23:07:43 +0200 Subject: [PATCH 4/6] also merging tests into 2 DAGs --- .../unique_devices_per_domain_daily_dag_test.py | 12 ------------ .../unique_devices_per_domain_monthly_dag_test.py | 12 ------------ ...ique_devices_per_project_family_daily_dag_test.py | 12 ------------ ...ue_devices_per_project_family_monthly_dag_test.py | 12 ------------ .../unique_devices/unique_devices_daily_dag_test.py | 11 +++++++++++ .../unique_devices_monthly_dag_test.py | 11 +++++++++++ 6 files changed, 22 insertions(+), 48 deletions(-) delete mode 100644 tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py delete mode 100644 tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py delete mode 100644 tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py delete mode 100644 tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py create mode 100644 tests/analytics/unique_devices/unique_devices_daily_dag_test.py create mode 100644 tests/analytics/unique_devices/unique_devices_monthly_dag_test.py diff --git a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py deleted file mode 100644 index c2373b7..0000000 --- a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_daily_dag_test.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -@pytest.fixture(name='dag_path') -def fixture_dagpath(): - return ['analytics', 'dags', 'unique_devices', 'per_domain', - 'unique_devices_per_domain_daily_dag.py'] - -def test_unique_devices_per_domain_daily_dag_loaded(airflow, dagbag): - assert dagbag.import_errors == {} - dag = dagbag.get_dag(dag_id="unique_devices_per_domain_daily") - assert dag is not None - assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py b/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py deleted file mode 100644 index 7ab42f1..0000000 --- a/tests/analytics/unique_devices/per_domain/unique_devices_per_domain_monthly_dag_test.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -@pytest.fixture(name='dag_path') -def fixture_dagpath(): - return ['analytics', 'dags', 'unique_devices', 'per_domain', - 'unique_devices_per_domain_monthly_dag.py'] - -def test_unique_devices_per_domain_monthly_dag_loaded(airflow, dagbag): - assert dagbag.import_errors == {} - dag = dagbag.get_dag(dag_id="unique_devices_per_domain_monthly") - assert dag is not None - assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py deleted file mode 100644 index 3deb84a..0000000 --- a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_daily_dag_test.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -@pytest.fixture(name='dag_path') -def fixture_dagpath(): - return ['analytics', 'dags', 'unique_devices', 'per_project_family', - 'unique_devices_per_project_family_daily_dag.py'] - -def test_unique_devices_per_project_family_daily_dag_loaded(airflow, dagbag): - assert dagbag.import_errors == {} - dag = dagbag.get_dag(dag_id="unique_devices_per_project_family_daily") - assert dag is not None - assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py b/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py deleted file mode 100644 index 69d1237..0000000 --- a/tests/analytics/unique_devices/per_project_family/unique_devices_per_project_family_monthly_dag_test.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -@pytest.fixture(name='dag_path') -def fixture_dagpath(): - return ['analytics', 'dags', 'unique_devices', 'per_project_family', - 'unique_devices_per_project_family_monthly_dag.py'] - -def test_unique_devices_per_project_family_monthly_dag_loaded(airflow, dagbag): - assert dagbag.import_errors == {} - dag = dagbag.get_dag(dag_id="unique_devices_per_project_family_monthly") - assert dag is not None - assert len(dag.tasks) == 4 diff --git a/tests/analytics/unique_devices/unique_devices_daily_dag_test.py b/tests/analytics/unique_devices/unique_devices_daily_dag_test.py new file mode 100644 index 0000000..a729d86 --- /dev/null +++ b/tests/analytics/unique_devices/unique_devices_daily_dag_test.py @@ -0,0 +1,11 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'unique_devices_daily_dag.py'] + +def test_unique_devices_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_daily") + assert dag is not None + assert len(dag.tasks) == 7 diff --git a/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py b/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py new file mode 100644 index 0000000..de29e88 --- /dev/null +++ b/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py @@ -0,0 +1,11 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'unique_devices_monthly_dag.py'] + +def test_unique_devices_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_monthly") + assert dag is not None + assert len(dag.tasks) == 7 -- GitLab From 56d51f1a21dc442f5105eb479b31312e699b5898 Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Fri, 23 Sep 2022 15:15:32 +0200 Subject: [PATCH 5/6] remove forgotten line --- analytics/dags/unique_devices/unique_devices_daily_dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/analytics/dags/unique_devices/unique_devices_daily_dag.py b/analytics/dags/unique_devices/unique_devices_daily_dag.py index c44003e..7eb6232 100644 --- a/analytics/dags/unique_devices/unique_devices_daily_dag.py +++ b/analytics/dags/unique_devices/unique_devices_daily_dag.py @@ -125,7 +125,6 @@ with DAG( # Archive operator to cleanly move the dump to its final destination. archive = HDFSArchiveOperator( task_id=f'move_{dataset_name}_data_to_archive', - refinery_job_shaded_jar_local_path=artifact(''), source_directory=dataset_props['temporary_directory'], check_done=True, archive_file=dataset_props['destination_path'], -- GitLab From 0dc0a7cc2b465e2b4ae1551c9bc59578991eb2b1 Mon Sep 17 00:00:00 2001 From: Marcel Ruiz Forns Date: Mon, 26 Sep 2022 18:53:30 +0200 Subject: [PATCH 6/6] small nits --- analytics/dags/unique_devices/unique_devices_daily_dag.py | 6 +++--- analytics/dags/unique_devices/unique_devices_monthly_dag.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/analytics/dags/unique_devices/unique_devices_daily_dag.py b/analytics/dags/unique_devices/unique_devices_daily_dag.py index 7eb6232..b40c14d 100644 --- a/analytics/dags/unique_devices/unique_devices_daily_dag.py +++ b/analytics/dags/unique_devices/unique_devices_daily_dag.py @@ -7,7 +7,7 @@ The metrics are stored in a Hive table and also as CSV archive files. from airflow import DAG from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) + archive_directory, hdfs_temp_directory, hql_directory, default_args) from datetime import datetime, timedelta from wmf_airflow_common.config.variable_properties import VariableProperties from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator @@ -64,7 +64,7 @@ with DAG( tags=['unique_devices', 'archive'], user_defined_filters=filters, default_args=var_props.get_merged('default_args', { - **spark3_default_args, + **default_args, 'sla': timedelta(hours=6), }), ) as dag: @@ -103,7 +103,7 @@ with DAG( executor_cores=2, executor_memory='8G', conf={ - 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.dynamicAllocation.maxExecutors': 64, 'spark.yarn.executor.memoryOverhead': 2048, 'spark.sql.shuffle.partitions': 512, }, diff --git a/analytics/dags/unique_devices/unique_devices_monthly_dag.py b/analytics/dags/unique_devices/unique_devices_monthly_dag.py index 218d027..81df3c8 100644 --- a/analytics/dags/unique_devices/unique_devices_monthly_dag.py +++ b/analytics/dags/unique_devices/unique_devices_monthly_dag.py @@ -6,7 +6,7 @@ The metrics are stored in a Hive table and also as CSV archive files. from airflow import DAG from analytics.config.dag_config import ( - archive_directory, hdfs_temp_directory, hql_directory, spark3_default_args) + archive_directory, hdfs_temp_directory, hql_directory, default_args) from datetime import datetime, timedelta from wmf_airflow_common.config.variable_properties import VariableProperties from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator @@ -70,7 +70,7 @@ with DAG( tags=['unique_devices', 'archive'], user_defined_filters=filters, default_args=var_props.get_merged('default_args', { - **spark3_default_args, + **default_args, 'sla': timedelta(days=1), }), ) as dag: @@ -108,7 +108,7 @@ with DAG( executor_cores=4, executor_memory='16G', conf={ - 'spark.dynamicAllocation.maxExecutors': 32, + 'spark.dynamicAllocation.maxExecutors': 64, 'spark.yarn.executor.memoryOverhead': 2048, 'spark.sql.shuffle.partitions': 512, }, -- GitLab