diff --git a/analytics/dags/unique_devices/unique_devices_daily_dag.py b/analytics/dags/unique_devices/unique_devices_daily_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..b40c14d9289572c07bcae3be14ad797c5db2f31a --- /dev/null +++ b/analytics/dags/unique_devices/unique_devices_daily_dag.py @@ -0,0 +1,135 @@ +''' +Calculates metrics about unique devices on a daily basis. +Two sets of metrics are generated: per domain and per project family. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_daily' +var_props = VariableProperties(f'{dag_id}_config') + +# Contains the properties of each of the datasets produced by this DAG. +datasets = var_props.get('datasets', { + 'per_domain': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_daily.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_daily_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_domain_daily', + 'temporary_directory': f'{hdfs_temp_directory}/{dag_id}/per_domain/{{{{ds}}}}', + 'destination_path': ( + f'{archive_directory}/unique_devices/per_domain/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_domain_daily-{{ds}}.gz' + ), + }, + 'per_project_family': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_daily.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_daily_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_project_family_daily', + 'temporary_directory': f'{hdfs_temp_directory}/{dag_id}/per_project_family/{{{{ds}}}}', + 'destination_path': ( + f'{archive_directory}/unique_devices/per_project_family/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_project_family_daily-{{ds}}.gz' + ), + }, +}) + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 15)), + schedule_interval='@daily', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **default_args, + 'sla': timedelta(hours=6), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # Sensor to wait for the pageview actor partitions. + sensor = NamedHivePartitionSensor( + task_id='wait_for_pageview_actor', + partition_names=daily_partitions( + table=source_table, + granularity='@hourly', + ), + poke_interval=timedelta(minutes=20).total_seconds(), + ) + + # From here on, the DAG splits in 2 analogous subdags: one for + # uniques per domain and the other for uniques per project family. + for dataset_name, dataset_props in datasets.items(): + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id=f'compute_{dataset_name}_metrics', + sql=dataset_props['compute_hql_path'], + query_parameters={ + 'source_table': source_table, + 'destination_table': dataset_props['intermediate_table'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=2, + executor_memory='8G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 64, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id=f'dump_{dataset_name}_metrics', + sql=dataset_props['dump_hql_path'], + query_parameters={ + 'source_table': dataset_props['intermediate_table'], + 'destination_directory': dataset_props['temporary_directory'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'day': '{{execution_date.day}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id=f'move_{dataset_name}_data_to_archive', + source_directory=dataset_props['temporary_directory'], + check_done=True, + archive_file=dataset_props['destination_path'], + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/analytics/dags/unique_devices/unique_devices_monthly_dag.py b/analytics/dags/unique_devices/unique_devices_monthly_dag.py new file mode 100644 index 0000000000000000000000000000000000000000..81df3c8f39e26763661423a1403447a3b2ad173c --- /dev/null +++ b/analytics/dags/unique_devices/unique_devices_monthly_dag.py @@ -0,0 +1,139 @@ +''' +Calculates metrics about unique devices on a monthly basis. +Two sets of metrics are generated: per domain and per project family. +The metrics are stored in a Hive table and also as CSV archive files. +''' + +from airflow import DAG +from analytics.config.dag_config import ( + archive_directory, hdfs_temp_directory, hql_directory, default_args) +from datetime import datetime, timedelta +from wmf_airflow_common.config.variable_properties import VariableProperties +from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator +from wmf_airflow_common.operators.spark import SparkSqlOperator +from wmf_airflow_common.partitions_builder import daily_partitions +from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor +from wmf_airflow_common.templates.time_filters import filters + +dag_id = 'unique_devices_monthly' +var_props = VariableProperties(f'{dag_id}_config') + +# Contains the properties of each of the datasets produced by this DAG. +datasets = var_props.get('datasets', { + 'per_domain': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_monthly.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_domain/' + + 'unique_devices_per_domain_monthly_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_domain_monthly', + 'temporary_directory': ( + f'{hdfs_temp_directory}/{dag_id}/per_domain/' + + '{{execution_date|to_ds_month}}' + ), + 'destination_path': ( + f'{archive_directory}/unique_devices/per_domain/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_domain_monthly-{{execution_date|to_ds_month}}.gz' + ), + }, + 'per_project_family': { + 'compute_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_monthly.hql' + ), + 'dump_hql_path': ( + f'{hql_directory}/unique_devices/per_project_family/' + + 'unique_devices_per_project_family_monthly_to_archive.hql' + ), + 'intermediate_table': 'wmf.unique_devices_per_project_family_monthly', + 'temporary_directory': ( + f'{hdfs_temp_directory}/{dag_id}/per_project_family/' + + '{{execution_date|to_ds_month}}' + ), + 'destination_path': ( + f'{archive_directory}/unique_devices/per_project_family/' + + '{{execution_date.year}}/{{execution_date|to_ds_month}}/' + + 'unique_devices_per_project_family_monthly-{{execution_date|to_ds_month}}.gz' + ), + }, +}) + +with DAG( + dag_id=dag_id, + doc_md=__doc__, + start_date=var_props.get_datetime('start_date', datetime(2022, 9, 1)), + schedule_interval='@monthly', + tags=['unique_devices', 'archive'], + user_defined_filters=filters, + default_args=var_props.get_merged('default_args', { + **default_args, + 'sla': timedelta(days=1), + }), +) as dag: + + # The pageview actor table to use as a source. + source_table = 'wmf.pageview_actor' + + # Sensor to wait for the pageview actor partitions. + sensor = RangeHivePartitionSensor( + task_id='wait_for_pageview_actor', + table_name=source_table, + from_timestamp='{{execution_date|start_of_current_month}}', + to_timestamp='{{execution_date|start_of_next_month}}', + granularity='@hourly', + poke_interval=timedelta(hours=1).total_seconds(), + ) + + # From here on, the DAG splits in 2 analogous subdags: one for + # uniques per domain and the other for uniques per project family. + for dataset_name, dataset_props in datasets.items(): + + # SQL operator for the computation of the metrics. + compute = SparkSqlOperator( + task_id=f'compute_{dataset_name}_metrics', + sql=dataset_props['compute_hql_path'], + query_parameters={ + 'source_table': source_table, + 'destination_table': dataset_props['intermediate_table'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + 'coalesce_partitions': 1, + }, + driver_cores=1, + driver_memory='4G', + executor_cores=4, + executor_memory='16G', + conf={ + 'spark.dynamicAllocation.maxExecutors': 64, + 'spark.yarn.executor.memoryOverhead': 2048, + 'spark.sql.shuffle.partitions': 512, + }, + ) + + # SQL operator for the dump of the metrics. + dump = SparkSqlOperator( + task_id=f'dump_{dataset_name}_metrics', + sql=dataset_props['dump_hql_path'], + query_parameters={ + 'source_table': dataset_props['intermediate_table'], + 'destination_directory': dataset_props['temporary_directory'], + 'year': '{{execution_date.year}}', + 'month': '{{execution_date.month}}', + }, + ) + + # Archive operator to cleanly move the dump to its final destination. + archive = HDFSArchiveOperator( + task_id=f'move_{dataset_name}_data_to_archive', + source_directory=dataset_props['temporary_directory'], + check_done=True, + archive_file=dataset_props['destination_path'], + archive_parent_umask='027', + archive_perms='640', + ) + + sensor >> compute >> dump >> archive diff --git a/tests/analytics/unique_devices/unique_devices_daily_dag_test.py b/tests/analytics/unique_devices/unique_devices_daily_dag_test.py new file mode 100644 index 0000000000000000000000000000000000000000..a729d869ff3de2a6052935bdcd05338066acf3cf --- /dev/null +++ b/tests/analytics/unique_devices/unique_devices_daily_dag_test.py @@ -0,0 +1,11 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'unique_devices_daily_dag.py'] + +def test_unique_devices_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_daily") + assert dag is not None + assert len(dag.tasks) == 7 diff --git a/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py b/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py new file mode 100644 index 0000000000000000000000000000000000000000..de29e88f4be6cdf71c1431ac7b2f718b9ac5ce88 --- /dev/null +++ b/tests/analytics/unique_devices/unique_devices_monthly_dag_test.py @@ -0,0 +1,11 @@ +import pytest + +@pytest.fixture(name='dag_path') +def fixture_dagpath(): + return ['analytics', 'dags', 'unique_devices', 'unique_devices_monthly_dag.py'] + +def test_unique_devices_daily_dag_loaded(airflow, dagbag): + assert dagbag.import_errors == {} + dag = dagbag.get_dag(dag_id="unique_devices_monthly") + assert dag is not None + assert len(dag.tasks) == 7