Commit c306f266 authored by Aqu's avatar Aqu
Browse files

Migrate the clickstream job to Airflow

parent 51a203fa
"""
### Creates the clickstream monthly dataset for various projects.
The job runs every month and its TSV results are synchronised to the public.
The Airflow dag launches a spark action that runs the ClickstreamBuilder scala job in
analytics-refinery-source/refinery-job and saves the results in a temporary folder.
Then the files from the temporary folder are archived to an archive location, with nice names.
* Temporary result location example:
/wmf/tmp/analytics/clickstream_monthly__create_clickstream_file__20220301/
* Archive example:
/wmf/data/archive/clickstream/2022-03/clickstream-enwiki-2022-03.tsv.gz
#### Sources:
- project_namespace_table: wmf_raw.mediawiki_project_namespace_map
- page_table: wmf_raw.mediawiki_page
- redirect_table: wmf_raw.mediawiki_redirect
- pagelinks_table: wmf_raw.mediawiki_pagelinks
- pageview_actor_table: wmf.pageview_actor
#### Variables stored in clickstream_monthly_config with their default values
* clickstream_archive_base_path: '/wmf/data/archive/clickstream'
* default_args: {'sla': timedelta(days=2)}
* wiki_list: ["enwiki", "ruwiki", "dewiki", "eswiki", "jawiki", "frwiki", "zhwiki", "itwiki",
"plwiki", "ptwiki", "fawiki"]
* start_date: datetime(2022, 3, 31)),
* refinery_job_jar: artifact('refinery-job-0.1.27-shaded')
* clickstream_minimum_links: 10
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
from wmf_airflow_common.operators.spark import SparkSubmitOperator
from wmf_airflow_common.templates.time_filters import filters
from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor
from wmf_airflow_common.partitions_builder import add_post_partitions, PrePartitions
from wmf_airflow_common.config.variable_properties import VariableProperties
from analytics.config.dag_config import default_args, artifact
from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator
dag_id = 'clickstream_monthly'
snapshot = '{{execution_date | to_ds_month}}'
var_props = VariableProperties(f'{dag_id}_config')
tmp_base_path = var_props.get('output_base_path', f'/wmf/tmp/analytics/{dag_id}__{snapshot}')
clickstream_archive_base_path = var_props.get(
'clickstream_archive_base_path',
'/wmf/data/archive/clickstream'
)
default_args = var_props.get_merged(
'default_args',
{**default_args, 'sla': timedelta(days=2)}
)
# List of mediawiki to create an archive for.
wiki_list_default = \
'enwiki ruwiki dewiki eswiki jawiki frwiki zhwiki itwiki plwiki ptwiki fawiki'.split()
wiki_list = var_props.get_list('wiki_list', wiki_list_default)
with DAG(dag_id=dag_id,
doc_md=__doc__,
start_date=var_props.get_datetime('start_date', datetime(2022, 4, 1)),
schedule_interval='@monthly',
tags=['archive'],
user_defined_filters=filters,
default_args=default_args) as dag:
sensors = []
sensors.append(RangeHivePartitionSensor(
task_id='wait_for_pageview_actor_partitions',
table_name='wmf.pageview_actor',
from_timestamp='{{execution_date | start_of_current_month}}',
to_timestamp='{{execution_date | start_of_next_month}}',
granularity='@hourly'
))
sensors.append(NamedHivePartitionSensor(
task_id='wait_for_mediawiki_project_namespace_map_snapshot',
partition_names=[f'wmf_raw.mediawiki_project_namespace_map/snapshot={snapshot}']
))
wiki_db_partitions = PrePartitions([[f'wiki_db={db}' for db in wiki_list]])
sensors.append(NamedHivePartitionSensor(
task_id='wait_for_mediawiki_page_snapshots',
partition_names=add_post_partitions(
[f'wmf_raw.mediawiki_page/snapshot={snapshot}'],
wiki_db_partitions
)
))
sensors.append(NamedHivePartitionSensor(
task_id='wait_for_mediawiki_pagelinks_snapshots',
partition_names=add_post_partitions(
[f'wmf_raw.mediawiki_pagelinks/snapshot={snapshot}'],
wiki_db_partitions
)
))
sensors.append(NamedHivePartitionSensor(
task_id='wait_for_mediawiki_redirect_snapshots',
partition_names=add_post_partitions(
[f'wmf_raw.mediawiki_redirect/snapshot={snapshot}'],
wiki_db_partitions
)
))
etl = SparkSubmitOperator(
task_id='clickstream_builder',
# Spark configuration optimized for this job.
# This job needs more memory to handle the page links dataset. Using the default of 4GB of
# memory triggers out of memory errors and Spark tasks are recomputed. It eventually
# finishes but it's slow.
executor_memory='8G',
conf={'spark.dynamicAllocation.maxExecutors': 128},
# Spark application
application=var_props.get('refinery_job_jar', artifact('refinery-job-0.1.27-shaded.jar')),
java_class='org.wikimedia.analytics.refinery.job.ClickstreamBuilder',
application_args={
'--output-base-path': tmp_base_path,
'--wikis': ','.join(wiki_list),
'--snapshot': snapshot,
'--year': '{{execution_date.year}}',
'--month': '{{execution_date.month}}',
# The minimum count for a link to appear in the dataset. Default to 10.
'--minimum-count': var_props.get('clickstream_minimum_links', 10),
'--output-files-parts': 1 # Make sure only 1 file per wiki is generated.
}
)
archivers = [
HDFSArchiveOperator(
source_directory=f"{tmp_base_path}/wiki_db={wiki}",
archive_file=f"{clickstream_archive_base_path}"
f"/{snapshot}"
f"/clickstream-{wiki}-{snapshot}.tsv.gz",
task_id = f'archive_{wiki}'
) for wiki in wiki_list
]
sensors >> etl >> archivers
......@@ -33,6 +33,8 @@ test =
flake8 ==4.0.*
mypy ==0.931
pylint ==2.12.*
dev =
ipython ==8.3.*
[tool:pytest]
......
import os
import pytest
from unittest import mock
from airflow.models import DagBag
# This is needed because clickstream_hourly_dag uses Variables.
# TODO: make this into a common fixture useable by all dags test.
@pytest.fixture(name='airflow', autouse=True, scope="session")
def fixture_airflow(tmp_path_factory):
"""
Sets up an airflow SQLlite database and airflow home
fixture used by the entire test session.
"""
from airflow.utils import db
airflow_environ = {
'AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS': "False",
'AIRFLOW__CORE__LOAD_EXAMPLES': 'False',
'AIRFLOW__CORE__UNIT_TEST_MODE': 'True',
'AIRFLOW_HOME': os.path.join(tmp_path_factory.mktemp('airflow_home'))
}
with mock.patch.dict(os.environ, airflow_environ, clear=True):
db.resetdb()
yield
# TODO: Make a common fixture that automatically loads and test all dag validity.
# https://www.astronomer.io/events/recaps/testing-airflow-to-bulletproof-your-code/
@pytest.fixture(name='dagbag')
def fixture_dagbag():
dag_bag = DagBag(None, include_examples=False, read_dags_from_db=False)
dag_file = os.path.join(os.path.abspath(os.getcwd()),
'analytics', 'dags', 'clickstream', 'clickstream_monthly_dag.py')
dag_bag.process_file(dag_file)
return dag_bag
def test_clickstream_monthly_dag_loaded(airflow, dagbag):
assert dagbag.import_errors == {}
dag = dagbag.get_dag(dag_id="clickstream_monthly")
assert dag is not None
assert len(dag.tasks) == 17
mw_page_sensor = dag.get_task('wait_for_mediawiki_page_snapshots')
partitions = mw_page_sensor.partition_names
assert len(partitions) == 11
partitions.sort()
assert partitions[0] == \
'wmf_raw.mediawiki_page/snapshot={{execution_date | to_ds_month}}/wiki_db=dewiki'
......@@ -96,7 +96,7 @@ def test_resolve_connection(spark_submit_hook_kwargs, expected):
pytest.param(
{
'driver_cores': '2',
'driver_cores': 2,
'driver_java_options': '-Dmy.prop=fake',
'application': 'fake_app.jar',
'master': 'yarn',
......@@ -136,7 +136,7 @@ def test_build_spark_submit_command(spark_submit_hook_kwargs, expected):
pytest.param(
{
'driver_cores': '2',
'driver_cores': 2,
'driver_java_options': '-Dmy.prop=fake',
'application': 'fake_app.jar',
'master': 'yarn',
......
import pendulum
from wmf_airflow_common.partitions_builder import (
partition_names_by_granularity, _get_partition_time_part,
_build_pre_partitions_options, _add_pre_partition_options,
build_partition_names, _build_partition_timestamps)
partition_names_by_granularity,
_get_partition_time_part,
_build_partitions_options,
_add_pre_partition_options,
build_partition_names,
_build_partition_timestamps,
add_post_partitions
)
def test_partition_name_by_granularity():
......@@ -45,11 +50,11 @@ def test_get_partition_part():
def test_build_pre_partitions_options():
assert _build_pre_partitions_options(['type=1', ['dc=a', 'dc=b']]) == (
assert _build_partitions_options(['type=1', ['dc=a', 'dc=b']]) == (
[['type=1', 'dc=a'], ['type=1', 'dc=b']])
assert _build_pre_partitions_options([['dc=a', 'dc=b'], 'type=1']) == (
assert _build_partitions_options([['dc=a', 'dc=b'], 'type=1']) == (
[['dc=a', 'type=1'], ['dc=b', 'type=1']])
assert _build_pre_partitions_options([[1, 2, 3], 4, [5, 6]]) == (
assert _build_partitions_options([[1, 2, 3], 4, [5, 6]]) == (
[[1, 4, 5], [1, 4, 6], [2, 4, 5], [2, 4, 6], [3, 4, 5], [3, 4, 6]])
......@@ -85,3 +90,9 @@ def test_build_partition_timestamps():
from_ts = '2022-03-08T00:00:00+00:00'
to_ts = '2022-03-09T01:59:59.999999'
assert len(_build_partition_timestamps(from_ts, to_ts, '@hourly')) == 25
def test_add_post_partitions():
partitions = ['year=2022/month=1']
post_partitions = [['wiki=en', 'wiki=zh']]
assert add_post_partitions(partitions, post_partitions) == \
['year=2022/month=1/wiki=en', 'year=2022/month=1/wiki=zh']
from wmf_airflow_common.config.variable_properties import VariableProperties
import unittest.mock as mock
from pytest import raises
def test_get_list(monkeypatch):
json_var = '{"a_list_var": ["a", "b"], "not_a_list_var": 1}'
with mock.patch.dict('os.environ', AIRFLOW_VAR_MYDAG_CONFIG=json_var):
varprop = VariableProperties('mydag_config')
assert varprop.get_list('a_list_var', []) == ['a', 'b']
assert varprop.get_list('missing_var', ['a_default']) == ['a_default']
with raises(ValueError):
varprop.get_list('not_a_list_var', [])
......@@ -93,6 +93,18 @@ class VariableProperties:
self.get_parsed(property_name, datetime.fromisoformat, default_value)
)
def get_list(self, property_name: str, default_value: list) -> list:
if type(default_value) is not list:
raise ValueError('Default value is not a list.')
no_parsing = lambda x: x
result = cast(
list,
self.get_parsed(property_name, no_parsing, default_value)
)
if type(result) is not list:
raise ValueError('Value is not a list.')
return result
def get_timedelta(self, property_name: str, default_value: timedelta) -> timedelta:
if type(default_value) is not timedelta:
raise ValueError('Default value is not a timedelta.')
......
......@@ -24,7 +24,7 @@ class SparkSubmitHook(AirflowSparkSubmitHook):
def __init__(
self,
application: str,
driver_cores: Optional[str] = None,
driver_cores: Optional[int] = None,
driver_java_options: Optional[str] = None,
master: Optional[str] = 'yarn',
deploy_mode: Optional[str] = 'client',
......@@ -47,7 +47,7 @@ class SparkSubmitHook(AirflowSparkSubmitHook):
:param driver_cores:
spark-submit --driver-cores, defaults to None.
:type driver_cores: Optional[str], optional
:type driver_cores: Optional[int], optional
:param driver_java_options:
spark-submit --driver-java-options, defaults to None.
......
......@@ -12,15 +12,17 @@ class HDFSArchiveOperator(BashOperator):
- deletes the source directory
example of refinery_job_shaded_jar_local_path:
/srv/deployment/analytics/refinery/artifacts/org/wikimesdia/analytics/refinery/
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/
refinery-job-0.1.20-shaded.jar
"""
def __init__(
self,
refinery_job_shaded_jar_local_path: str,
source_directory: str,
archive_file: str,
refinery_job_shaded_jar_local_path: str = '/srv/deployment/analytics/refinery/artifacts'
'/org/wikimedia/analytics/refinery/'
'refinery-job-0.1.27-shaded.jar',
archive_parent_umask: str = "022",
archive_perms: str = "644",
expected_filename_ending: str = ".gz",
......@@ -42,6 +44,7 @@ class HDFSArchiveOperator(BashOperator):
:param kwargs:
"""
bash_command = f"""
((
java -cp
{refinery_job_shaded_jar_local_path}:$(/usr/bin/hadoop classpath)
org.wikimedia.analytics.refinery.job.HDFSArchiver
......@@ -52,6 +55,7 @@ class HDFSArchiveOperator(BashOperator):
--expected_filename_ending "{expected_filename_ending}"
--check_done={str(check_done).lower()}
--done_file "{done_file}"
) || if [ $? = 1 ]; then exit 0; else exit 1; fi)
"""
bash_command = textwrap.dedent(bash_command).strip().replace('\n', ' ')
super().__init__(bash_command=bash_command, **kwargs)
......@@ -23,7 +23,7 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
launcher: Optional[str] = None,
avoid_local_execution: bool = False,
command_preamble: Optional[str] = None,
driver_cores: Optional[str] = None,
driver_cores: Optional[int] = None,
driver_java_options: Optional[str] = None,
master: Optional[str] = 'local',
deploy_mode: Optional[str] = None,
......
......@@ -175,23 +175,64 @@ def _get_partition_time_part(part: str, timestamp: Optional[datetime] = None) ->
return f'{part}={{{{execution_date.{part}}}}}'
def _add_pre_partition_options(partition: List[str], pre_partitions: PrePartitions) -> List[List[str]]:
"""Add the pre-partition options to the partition.
def _add_pre_partition_options(
partitions: List[str],
pre_partitions: PrePartitions) -> List[List[str]]:
"""
if not pre_partitions:
return [partition]
Add the options before the other partitions
eg: input: True, ['wmf.table', 'type=1'], ['dc=a', 'dc=b']
output: [['wmf.table', 'dc=a', 'type=1'], ['wmf.table', 'dc=b', 'type=1']]
"""
return _add_options_to_partitions(
pre=True,
partitions=partitions,
partition_options=pre_partitions
)
def add_post_partitions(partitions: List[str], post_partitions: PrePartitions) -> List[str]:
"""
Build a list of partitions by appending the post_partitions options after the provided
partitions.
:param partitions: eg: ['wmf.table', 'year=2022', 'month=1'] or ['wmf.table/year=2022/month=1']
:param post_partitions: A list describing the options to add to the partitions
eg [['dc=1', 'dc=2']]
:return: A list partitions with post partitions
eg: ['wmf.table/year=2022/month=1/dc=1', 'wmf.table/year=2022/month=1/dc=2']
"""
_partitions = _add_options_to_partitions(
pre=False,
partitions=partitions,
partition_options=post_partitions
)
return ['/'.join(partition) for partition in _partitions]
def _add_options_to_partitions(
pre: bool,
partitions: List[str],
partition_options: PrePartitions) -> List[List[str]]:
"""
Add the options before or after the other partitions
eg: input: True, ['wmf.table', 'type=1'], ['dc=a', 'dc=b']
output: [['wmf.table', 'dc=a', 'type=1'], ['wmf.table', 'dc=b', 'type=1']]
"""
if not partition_options:
return [partitions]
result = []
for options in _build_pre_partitions_options(pre_partitions):
partition_copy = partition.copy()
for options in _build_partitions_options(partition_options):
partition_copy = partitions.copy()
options.reverse()
for option in options:
partition_copy.insert(1, option)
if pre:
partition_copy.insert(1, option)
else:
partition_copy.append(option)
result.append(partition_copy)
return result
# TODO: what is pre_partitions type???
def _build_pre_partitions_options(pre_partitions: PrePartitions) -> List[List[str]]:
def _build_partitions_options(pre_partitions: PrePartitions) -> List[List[str]]:
"""Build the different options of partitions resulting from the different
options of pre_partitions.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment