Commit 54d3d4a7 authored by Gmodena's avatar Gmodena
Browse files

T295360 datapipeline scaffolding

parent e2835e50
FROM continuumio/miniconda3
RUN conda install python=3.7 openjdk=8.0.152
RUN pip install tox==3.24.4
RUN pip install tox==3.24.4 cookiecutter==1.7.3
include Makefile.python
# A space (" ") separated list of projects to build and deploy.
TARGETS = "image-matching"
# Define Gitlab project paths
branch := $(shell git rev-parse --abbrev-ref HEAD)
short_commit_hash := $(shell git rev-parse --short=8 HEAD)
airflow_host := an-airflow1003.eqiad.wmnet
......@@ -13,33 +17,51 @@ gitlab_ci_api_root :=
gitlab_package_archive := platform-airflow-dags.tar.gz
platform_airflow_dags_url := ${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/${gitlab_project_name}/${gitlab_package_version}/${gitlab_package_archive}
ima_home := image-matching
ima_venv_archive := venv.tar.gz
ifneq ($(SKIP_DOCKER),true)
lint-all: docker-conda
test-all: docker-conda
test-dags: docker-conda
datapipeline: docker-conda
venv_archive := ${venv}.${venv_archive_format} # inherited from Makefile.python
# Runs some command to setup DAGs, venvs and project code on an airflow worker.
# TODO(gmodena): WARNING this command could leave the remote directory in a dirty state.
# Project directories in ${airflow_home} are removed based on tagets declared in ${TARGET}.
# If a pipeline is removed from the ${TARGETS} variable, its matching remote
# path won't be deleted. Proper cleanup requires manual intervention.
ssh ${airflow_host} 'sudo -u ${airflow_user} rm -r ${airflow_home}/dags/*'
ssh ${airflow_host} 'sudo -u ${airflow_user} rm -r ${airflow_home}/image-matching'
ssh ${airflow_host} 'sudo -u ${airflow_user} tar xzf ${gitlab_package_archive} -C ${airflow_home}'
ssh ${airflow_host} 'sudo -u ${airflow_user} mkdir ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} tar xvzf ${airflow_home}/image-matching/venv.tar.gz -C ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'rm ${gitlab_package_archive}'
rm -f ${ima_home}/${ima_venv_archive}
cd ${ima_home}; make venv
for target in $(shell echo ${TARGETS}); do \
ssh ${airflow_host} "sudo -u ${airflow_user} rm -r ${airflow_home}/$$target"; \
ssh ${airflow_host} "sudo -u ${airflow_user} mkdir -p ${airflow_home}/$$target/venv"; \
ssh ${airflow_host} "sudo -u ${airflow_user} tar xzf ${gitlab_package_archive} -C ${airflow_home}";
for target in $(shell echo ${TARGETS}); do \
ssh ${airflow_host} "sudo -u ${airflow_user} tar xvzf ${airflow_home}/$$target/${venv_archive} -C ${airflow_home}/$$target/venv"; \
ssh ${airflow_host} "rm ${gitlab_package_archive}"
## Code checks
# Run linting on all projects
cd ${ima_home}; make lint
for target in $(shell echo ${TARGETS}); do \
make lint -C $$target; \
# Run the tests suite on all projects
for target in $(shell echo ${TARGETS}); do \
make test -C $$target; \
# Run compile-time type checks on all projects.
for target in $(shell echo ${TARGETS}); do \
make mypy -C $$target; \
# Run the top level airflow dags test suite
test-dags: ${pip_requirements_test}
${DOCKER_CMD} bash -c "tox -e dags"
......@@ -47,13 +69,16 @@ test_dags:
echo "WARNING: deprecated. Use make test-dags instead"
make test-dags
cd ${ima_home}; make test
## Package dags and project dependencies.
archive: ima-venv
tar cvz --exclude='.[^/]*' --exclude='__pycache__' --exclude='venv/' -f platform-airflow-dags.tar.gz *
# Build a virtual environment for a datapipeline project.
for target in $(shell echo ${TARGETS}); do \
rm -f $$target/${venv_archive}; \
make venv -C $$target; \
# Archive the projects and virtual environments.
# This is the artifact that will be deployed on ${gitlab_package_archive}.
tar cvz --exclude='.[^/]*' --exclude='datapipeline-scaffold/*' --exclude='__pycache__' --exclude='venv/*' --exclude=${gitlab_package_archive} -f ${gitlab_package_archive} dags $(shell echo ${TARGETS})
# Publish an artifact to a Gitlab Generic Archive registry using a private token.
publish: archive
......@@ -80,3 +105,9 @@ deploy-gitlab-build:
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
## Scaffolding
# Create a new datapipeline template
@${DOCKER_CMD} bash -c "cookiecutter datapipeline-scaffold"
......@@ -16,6 +16,7 @@ CONDA_CMD := conda config --set pip_interop_enabled True; \
DOCKER_IMG := platform/miniconda3
DOCKERFILE := Dockerfile.python
DOCKER_PLATFORM := linux/amd64
ifneq ($(SKIP_DOCKER),true)
CURRENT_DIR := $(shell pwd)
......@@ -31,7 +32,7 @@ venv: docker-conda
docker build -t ${DOCKER_IMG} -f ${DOCKERFILE} .
docker buildx build --platform ${DOCKER_PLATFORM} -t ${DOCKER_IMG} -f ${DOCKERFILE} .
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
......@@ -39,7 +40,6 @@ venv: ${pip_requirements}
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
mypy: ${pip_requirements_test}
${DOCKER_CMD} bash -c "tox -e mypy"
......@@ -11,6 +11,10 @@ You can reach out to us at
* TODO: Add irc channel
* Slack: [#data-platform-value-stream](
# Requirements
Tools provided by this repository require [Docker](
# Data pipelines
> […] a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. […] >
......@@ -22,6 +26,32 @@ A Generated Datasets Platform pipeline is made up by two components:
Data pipelines are executed on Hadoop. Elastic compute is provided by Spark (jobs are deployed in cluster mode). Scheduling and orchestration is delegated to Apache Airflow. Currently we support Python based projects. Scala support is planned.
## Create a new data pipeline
Clone this repo and create a dev branch with:
cd platform-airflow-dag
git checkout -b your_data_pipeline_branchname
A new datapipline can be created with:
make datapipeline
This will generate a new directory for pipeline code under:
And install an Airflow dag template under
## Repo layout
This repository follows a [monorepo]( strategy. Its structure matches the layout of `AIRFLOW_HOME` on the [an-airflow1003.eqiad.wmnet]( airflow instance.
......@@ -38,6 +68,16 @@ The following command will run code checks and deploy data pipelines:
make deploy-local-build
### Deploy a new pipeline
Deployment piplines are declared in the `TARGET` variable in `Makefile`.
To deploy a new pipeline, append its project directory name to `TARGET`.
For example, if a new pipeline has been created as `my_new_datapipeline`, the new
`TARGET` list would look like the following:
TARGET := "image-matching my_new_datapipeline"
# CI & code checks
owner: null
run_as_owner: true
depends_on_past: false
retries: 1
retry_delay: 5
catchup: false
import abc
import getpass
import logging
import os
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import List, Optional
import yaml
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.bash import BashOperator
from airflow.utils.helpers import chain
LOGGER = logging.getLogger("airflow.task")
config_path = os.path.dirname(Path(__file__))
def _load_config() -> dict:
with open(
os.path.join(config_path, "../", "config", "sequence.yaml")
) as config_file:
config = yaml.safe_load(config_file)
default_args = {
"owner": config.get(
"owner", getpass.getuser()
), # User running the job (default_user: airflow)
"run_as_owner": config.get("run_as_owner", True),
"depends_on_past": config.get("depends_on_past", False),
"retries": config.get("retries", 1),
"retry_delay": timedelta(minutes=int(config.get("retry_delay", 5))),
"catchup": config.get("catchup", False),
return default_args
class SparkConfig:
SparkConfig is a dataclass that represents a set of Spark
configuration settings. It provides boilerplate to:
- override default-spark.conf with a user-provided properties file.
- configure a Python virtual env.
pipeline: str
pipeline_home: str = "/srv/airflow-platform_eng"
def _load_properties(self, props: List[str]) -> str:
Parses a list of properties.
:param props: a list of properties.
:returns conf: a trimmed string of Spark command line arguments.
conf = ""
for line in props:
line = line.strip()
if line and not line.startswith("#"):
key, value = line.split("\t")
conf += f"--conf '{key}={value}' "
return conf.strip()
def venv(self) -> str:
:returns: the absolute path to the Python virtual environment.
return os.path.join(self.pipeline_home, self.pipeline, "pyspark", "venv")
def venv_archive(self) -> str:
:returns: the absolute path to the Python virtual environment
(aliased) archive. For example
return os.path.join(
self.pipeline_home, self.pipeline, "pyspark", "venv.tar.gz#venv"
def properties(self) -> str:
Extracts settings from a properties file and generates
a string of Spark command line arguments in the
form --conf 'key=value'.
:returns: a string of command line arguments.
conf = ""
properties_file = os.path.join(
self.pipeline_home, self.pipeline, "conf", ""
if os.path.exists(properties_file):
# ConfigParser does not support header-less properties files.
# This use case is simple and specific enough to roll our own parsing
# logic, rather than mangling or installing external deps.
with open(properties_file) as infile:
conf = self._load_properties(infile.readlines())
LOGGER.warning(" not found at {properties_file}.")
return conf
class Task(abc.ABC):
Task interface for configuration dataclasses
def operator(self, dag: Optional[DAG] = None) -> BaseOperator:
:param dag: an Airflow dag.
:returns an Airflow operator that executes the task
class PySparkTask(Task):
PySparkTask is a dataclass that represents a spark-submit command.
main: str
input_path: str
output_path: str
config: SparkConfig
pyspark_main_args: Optional[str] = ""
def operator(self, dag: Optional[DAG] = None) -> BashOperator:
TODO(gmodena): once available, this method should return
an instance of SparkSubmitOperator.
:param dag: an Airflow dag.
:returns: a BashOperator that runs spark-submit.
return BashOperator(
bash_command=f"PYSPARK_PYTHON=./venv/bin/python "
f"PYSPARK_DRIVER_PYTHON={self.config.venv()}/python spark2-submit "
f"{} "
f"--archives {self.config.venv_archive()} "
"--conf 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./venv/bin/python' "
"--conf 'spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./venv/bin/python' "
f"{self.main} {self.pyspark_main_args} "
f"{self.input_path} {self.output_path} ",
class SparkSqlTask(Task):
SparkSqlTask is a dataclass that represents a spark-sql command.
:param hiveconf_args: a string of "--hiveconf <property=value>" parameters.
:param config: a spark config.
:param filename: path to a file containing an HQL query.
config: SparkConfig
filename: Path
hiveconf_args: Optional[str] = ""
def operator(self, dag: Optional[DAG] = None) -> BashOperator:
Executes a Hive/SparkSQL query.
Cluster deploy mode is not applicable to Spark SQL shell.
bash_command will enforce --deploy-mode client.
TODO(gmodena): once available, this method should return
an instance of SparkSqlOperator.
:param dag: an Airflow dag.
:returns: a BashOperator that runs spark-sql.
return BashOperator(
bash_command=f"spark2-sql "
f"{} "
"--deploy-mode client "
f"{self.hiveconf_args} "
f"-f {self.filename} ",
def generate_dag(pipeline: str, tasks: List[Task], dag_args: dict = {}) -> DAG:
Chains together a List of operators to form an Airflow DAG.
This is equivalent to:
>>> op1 >> op2 >> ... >> opN
:param pipeline: the data pipeline name.
:param tasks: a list of Airflow operators.
:param dag_args: a dictionary of Airflow configuration arguments.
:retruns dag: an Airflow DAG.
default_args = _load_config()
with DAG(
tags=[pipeline, "generated-data-platform", "devel"],
) as dag:
operators = [t.operator(dag=dag) for t in tasks]
return dag
ima_home: = "/srv/airflow-platform_eng/image-matching"
run_id: = "8419345a-3404-4a7c-93e1-b9e6813706ff"
image_suggestion_dir: f"/srv/airflow-platform_eng/image-matching/"
snapshot: "2021-09-08"
hive_user_db: "analytics_platform_eng"
A [cookiecutter]( template to scaffold Generated Data Platform
## Hooks
Post project gen hooks under `hooks/` will move dag templates
under the repo toplevel `dags` directory.
"pipeline_name": "Generated Data Pipeline",
"pipeline_directory": "{{ cookiecutter.pipeline_name.lower().replace('-', '_').replace(' ', '_') }}",
"pipeline_owner": "Some Dataset Producer"
import os
import shutil
source_dir = os.getcwd()
dags_dir = "../dags"
# Move dags to the monorepo top dir
os.path.join(source_dir, "dags", "{{cookiecutter.pipeline_directory}}"), dags_dir
os.path.join(source_dir, "dags", "config", "{{cookiecutter.pipeline_directory}}.yaml"),
os.path.join(dags_dir, "config"),
shutil.rmtree(os.path.join("../", "{{cookiecutter.pipeline_directory}}", "dags"))
python_dir := pyspark
test ${python_dir}/venv.tar.gz || rm ${python_dir}/venv.tar.gz; \
test -d ${python_dir}/venv || rm -r ${python_dir}/venv; \
make venv -C ${python_dir}; mv ${python_dir}/venv.tar.gz .
make test -C pyspark
make lint -C pyspark
make mypy -C pyspark
# {{ cookiecutter.pipeline_name }}
> A boilerplate for your Generate Data Platform pipeline.
Take a look at our [documentation]() before getting started.
This pipeline is owned by {{cookiecutter.pipeline_owner}}.
# Guidelines
In order to get the best out of the template:
* Don't modify the Makefiles and Dockerfiles we provide.
* Don't remove any lines from the tox.ini file we provide.
* Don't commit data to git.
* Don't commit any credentials or local configuration files.
* Convert Jupyter Notebooks you'd like to schedule to a script with `jupyter nbconvert --to script notebook.ipynb`.
* Install Docker or Docker Desktop on your development machine.
You can read more about our guidelines, codechecks and contribution model
in our [documentation]().
# Content
- `conf` contains Spark job specific config files. `` will let you define your cluster topology and
desired resources. We default to a [yarn-regular]( sized cluster.
- `pyspark` contains Spark based data processing tasks.
- `sql` contains SQL/HQL based data processing tasks.
- `test` contains a test suite
An Airflow DAG template has been created in the monorepo top level dags directory.
# Codechecks
A valid project is expected to pass the following code checks:
* Compile time type-checking
* Unit tests
* Linting
* DAG validation tests
Code checks are triggered automatically after a `git push`, or when a merge request
is opened. The following sections describe how to trigger checks manually.
## Test
Test in a Docker container
make test
Or on the native system:
make test SKIP_DOCKER=true
## Lint
Lint in a Docker container
make lint
Or on the native system:
make lint SKIP_DOCKER=true
## Type checking
Type-check code in a Docker container
make mypy
Or on the native system:
make mypy SKIP_DOCKER=true
spark.master yarn
spark.submit.deployMode cluster
# Cluster topology for regular sized jobs (15% resource utilisation)
spark.driver.memory 2g
spark.dynamicAllocation.maxExecutors 64
spark.executor.memory 8g
spark.executor.cores 4
spark.sql.shuffle.partitions 256
# Job specific config
spark.sql.sources.partitionOverwriteMode dynamic
# Append spark-defaults.conf from:
# /usr/lib/spark2/conf/spark-defaults.conf
pipeline_home: "/srv/airflow-platform_eng/{{cookiecutter.pipeline_directory}}"
A boilerplate module to help you get started with running the
{{cookiecutter.pipeline_directory}} pipeline on Wikimedia's
Airflow infastructure.
This module shows how use PySparkTask and SparkSqlTask dataclasses
to spin up a spark cluster, and submit pyspark and SparkSql jobs
according to Generated Data Platform conventions.
Given a list of task configurations, and Airflow DAG is generated by
import os.path
from pathlib import Path
import yaml
from factory.sequence import PySparkTask, SparkConfig, SparkSqlTask, generate_dag
from airflow.utils.dates import days_ago
config_path = os.path.dirname(Path(__file__))
pipeline_config = os.path.join(
config_path, "config", "{{cookiecutter.pipeline_directory}}.yaml"
with open(pipeline_config) as config_file:
# {{cookiecutter.pipeline_directory}}_config.yaml contains airflow
# and project specific settings.
config = yaml.safe_load(config_file)
# dag_args specifies the pipeline start date and schedule interval.
# It can be extended with any valid Airflow configuration setting.
# A list of all available default_args can be found at
# for more details
dag_args = {
"start_date": config.get("start_date", days_ago(1)),
"schedule_interval": config.get("schedule_interval"),