Commit 980c67ec authored by Gmodena's avatar Gmodena
Browse files

Merge branch 'multi-project-dags-repo' of...

Merge branch 'multi-project-dags-repo' of into multi-project-dags-repo
parents cbee21e9 b8a8a95e
Pipeline #1071 canceled with stages
......@@ -121,3 +121,5 @@ venv.bak/
# Makefile
# idea
image: "continuumio/miniconda3"
- build
- test
- publish
# TODO(gmodena): cross-project artifacts download
# are not available in Gitlab Freemium.
# stage: build-ima
# script:
# - ls -lhR
# needs:
# - project: gmodena/ImageMatching
# job: build
# ref: add-gitlab-ci
# artifacts: true
# curl is not installed by default on continuumio/miniconda3.
- conda install -y make curl
# Install IMA dependencies in a virtual environment
# and publish it as a build artifact.
# TODO(gmodena): venvs produced by conda-pack and venv-pack
# exceed the artifact size limit provided by gitlab.
# Currently env and dep building is delegated to the "deploy"
# host. In our case, the developer workstation where
# `make deploy` is executed.
# stage: build
# script:
# - make -C image-matching venv
# artifacts:
# paths:
# - image-matching/venv.tar.gz
# expire_in: "1 hour"
# Install Apache Spark in the test image.
# TODO(gmodena): we should have a polyglot Docker image with jdk8, python3,
# and spark pre-installed
stage: test
- make test SKIP_DOCKER=true
stage: publish
- make archive
- 'curl -v --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file /tmp/platform-airflow-dags.tar.gz "${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/platform-airflow-dags/${CI_COMMIT_REF_NAME}-${CI_COMMIT_SHORT_SHA}/platform-airflow-dags.tar.gz"'
branch := $(shell git rev-parse --abbrev-ref HEAD)
short_commit_hash := $(shell git rev-parse --short=8 HEAD)
airflow_host := an-airflow1003.eqiad.wmnet
airflow_user := analytics-platform-eng
airflow_home := /srv/airflow-platform_eng/
environment := dev
gitlab_project_id := 56
gitlab_project_name := platform-airflow-dags
gitlab_package_version := ${branch}-${short_commit_hash}
gitlab_ci_api_root :=
gitlab_package_archive := platform-airflow-dags.tar.gz
platform_airflow_dags_url := ${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/${gitlab_project_name}/${gitlab_package_version}/${gitlab_package_archive}
ima_home := image-matching
ima_venv_archive := venv.tar.gz
# Runs some command to setup DAGs, venvs and project code on an airflow worker.
ssh ${airflow_host} 'sudo -u ${airflow_user} rm -rf ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} mkdir ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} tar xvzf ${airflow_home}/image-matching/venv.tar.gz -C ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'rm ${gitlab_package_archive}'
rm -f ${ima_home}/${ima_venv_archive}
make -C ${ima_home} venv
cd ${ima_home}; make test
archive: ima-venv
tar cvz --exclude=".*" -f ${gitlab_package_archive} .
# Publish an artifact to a Gitlab Generic Archive registry using a private token.
publish: archive
status=$(git status --porcelain)
test "x$(status)" = "x" || echo "Echo Working directory is dirty. Aborting."
#curl -v --header "PRIVATE-TOKEN: ${GITLAB_PRIVATE_TOKEN}" --upload-file /tmp/platform-airflow-dags.tar.gz "${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/platform-airflow-dags/${branch}-${short_commit_hash}/${gitlab_package_archive}"
# Test, assemble venvs, generate an archive locally and ship it to the airflow worker.
deploy-local-build: test archive
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Delegate CI to gitlab, and ship a successfully built artifact to the airflow worker.
curl --fail -o ${gitlab_package_archive} ${platform_airflow_dags_url}
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Authors
# Clara Andrew-Wani 2021 (
from datetime import timedelta
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
......@@ -24,7 +24,7 @@ default_args = {
"retry_delay": timedelta(minutes=5),
"start_date": days_ago(1),
"catchup": True,
"schedule_interval": "@once",
"schedule_interval": None,
with DAG(
......@@ -37,16 +37,16 @@ with DAG(
image_suggestion_dir = os.environ.get("IMAGE_SUGGESTION_DIR", f'/srv/airflow-platform_eng/image-matching/')
# TODO: Undo hardcode, use airflow generated run id
run_id = '8419345a-3404-4a7c-93e1-b9e6813706ff'
snapshot = '2021-07-26'
monthly_snapshot = '2021-07'
snapshot = "{{ dag_run.conf['snapshot'] or '2021-09-08' }}"
monthly_snapshot = datetime.fromisoformat(snapshot).strftime('%Y-%m')
username = getpass.getuser()
hive_user_db = 'analytics_platform_eng'
config = configparser.ConfigParser()
ima_home = '/srv/airflow-platform_eng/image-matching'
# wikis = config.get("poc_wikis", "target_wikis")
# wikis = wikis.split()
wikis = ['kowiki', ]
wikis = ['kowiki', 'plwiki']
# Create directories for pipeline
algo_outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/Output')
......@@ -73,13 +73,13 @@ with DAG(
# TODO: Look into SparkSubmitOperator
generate_placeholder_images = BashOperator(
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON=python spark2-submit --archives {image_suggestion_dir}/venv.tar.gz#venv --properties-file {spark_config} {image_suggestion_dir}/python/'
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file /srv/airflow-platform_eng/image-matching/runs/{run_id}/ --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/ {snapshot}'
# Update hive external table metadata
update_imagerec_table = BashOperator(
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec.hql'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec.hql'
......@@ -87,7 +87,7 @@ with DAG(
for wiki in wikis:
algo_run = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} {image_suggestion_dir}/python/'
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file {ima_home}/runs/{run_id}/ --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/ {snapshot} {wiki}'
# Sensor for finished algo run
......@@ -106,8 +106,8 @@ with DAG(
upload_imagerec_to_hdfs = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} --master {spark_master_local} \
--files {image_suggestion_dir}/etl/ \
{image_suggestion_dir}/etl/ \
--files {image_suggestion_dir}/spark/ \
{image_suggestion_dir}/spark/ \
--wiki {wiki} \
--snapshot {monthly_snapshot} \
--source file://{algo_outputdir}/{wiki}_{snapshot}_wd_image_candidates.tsv \
......@@ -115,16 +115,14 @@ with DAG(
# Link tasks
generate_spark_config >> generate_placeholder_images >> algo_run
algo_run >> raw_dataset_sensor
raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
generate_spark_config >> generate_placeholder_images >> algo_run >> raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
# Generate production data
hdfs_imagerec_prod = f'/user/{username}/imagerec_prod'
generate_production_data = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/etl/ \
{image_suggestion_dir}/etl/ \
bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/spark/ \
{image_suggestion_dir}/spark/ \
--snapshot {monthly_snapshot} \
--source {hdfs_imagerec} \
--destination {hdfs_imagerec_prod} \
......@@ -134,7 +132,7 @@ with DAG(
# Update hive external production metadata
update_imagerec_prod_table = BashOperator(
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec_prod.hql'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec_prod.hql'
for wiki in wikis:
......@@ -142,7 +140,7 @@ with DAG(
# Export production datasets
export_prod_data = BashOperator(
bash_command=f'hive -hiveconf username={username} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/ddl/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/sql/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
# Sensor for production data
......@@ -163,3 +161,4 @@ with DAG(
update_imagerec_table >> generate_production_data >> update_imagerec_prod_table
update_imagerec_prod_table >> export_prod_data
export_prod_data >> production_dataset_sensor >> append_tsv_header
SHELL := /bin/bash
venv := venv
venv_archive_format := tar.gz
pip_requirements := requirements.txt
pip_requirements_test := requirements-test.txt
conda_python_version = 3.7
pyspark_version = 2.4.5
extra_pypi :=
CONDA_CMD := conda config --set pip_interop_enabled True; conda create -n ${venv} python=${conda_python_version}; conda init bash; source ~/.bashrc && conda activate ${venv}
ifneq ($(SKIP_DOCKER),true)
DOCKER_IMG := continuumio/miniconda3
DOCKER_CMD := docker run -it \
--rm \
-v /Users/gmodena/repos/gitlab/platform-airflow-dags/image-matching:/root \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install --extra-index-url ${extra_pypi} -r ${pip_requirements}; \
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
test: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
conda install openjdk pyspark==${pyspark_version}; \
pip install -r ${pip_requirements_test}; \
PYTHONPATH=${PYTHONPATH}:spark/ pytest --cov spark tests/"
......@@ -2,20 +2,23 @@
Training and dataset publishing pipeline for the [Image Suggestion]( service.
Airflow DAG for model training and etl.
Data pipeline for model training and etl.
## Content
- `dags` contains the airflow dags for this workflow.
- `conf` contains job specific config files.
- `spark` contains Spark based data processing tasks.
- `sbin` contains python scripts for data processing tasks.
- `sql` contains SQL/HQL based data processing tasks.
- `test` contains a test suite
## Test
Test in a Docker container
make test
python3 -m venv venv
source venv/bin/activate
pip install -r requirements-test.txt
PYTHONPATH=spark python3 -m test
Test on nativ system:
make test SKIP_DOCKER=true
spark.master yarn
spark.submit.deployMode cluster
spark.submit.deployMode client
# Cluster topology for regular sized jobs (15% resource utilisation)
spark.driver.memory 2g
\ No newline at end of file
import argparse
import papermill as pm
import os
# Todo: find a more accurate way to get dblist.
all_languages = languages = ['enwiki', 'arwiki', 'kowiki', 'cswiki', 'viwiki', 'frwiki', 'fawiki', 'ptwiki',
'ruwiki', 'trwiki', 'plwiki', 'hewiki', 'svwiki', 'ukwiki', 'huwiki', 'hywiki',
'srwiki', 'euwiki', 'arzwiki', 'cebwiki', 'dewiki', 'bnwiki']
class AlgoRunner(object):
def __init__(self, snapshot, languages, output_dir):
:param str languages: A list of the languages separated by a comma to run against the algorithm.
:param str snapshot: Snapshot date
:param str output_dir: Directory to place output .ipynb and .tsv files
self.snapshot = snapshot
self.languages = languages.split(',')
self.output_dir = output_dir
print(f'Initializing with snapshot={self.snapshot} languages={self.languages} output_dir={self.output_dir}')
def run(self):
if len(self.languages) == 1 and self.languages[0] == 'All':
def execute_papermill(self, languages):
Executes jupyter notebook
:param list languages: List of languages to run against the algorithm
print(f'Starting to execute the algorithm for the following languages: {languages}')
if not os.path.exists(self.output_dir):
for language in languages:
self.output_dir + '/' + language + '_' + self.snapshot + '.ipynb',
parameters=dict(language=language, snapshot=self.snapshot, output_dir=self.output_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Executes jupyter notebook with parameters. ' +
'Ex: python3 2020-12-28 hywiki Output')
parser.add_argument('snapshot', help='Full snapshot date. Ex: 2020-12-28')
parser.add_argument('languages', nargs='?', default='All',
help='Language(s) to execute. If more than one separate with comma. Ex: enwiki,kowiki,arwiki')
parser.add_argument('output_dir', nargs='?', default='Output',
help='Directory to place output .ipynb and .tsv files. Defaults to: Output')
args = parser.parse_args()
runner = AlgoRunner(args.snapshot, args.languages, args.output_dir)
......@@ -4,6 +4,10 @@ from enum import Enum
class InstancesToFilter(Enum):
YEAR = "Q577"
FAMILYNAME = "Q101352"
NAME = "Q82799"
LIST = "Q13406463"
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import argparse
spark = SparkSession.builder.getOrCreate()
def parse_args():
parser = argparse.ArgumentParser(
description="Transform raw algo output to production datasets"
parser.add_argument("--snapshot", help="Monthly snapshot date (YYYY-MM-DD)")
parser.add_argument("--source", help="Source dataset path")
parser.add_argument("--destination", help="Destination path")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
snapshot = args.snapshot.split("-")
source = args.source
destination = args.destination
year = snapshot[0]
month = snapshot[1]
day = snapshot[2]
num_partitions = 1
df =
.filter(F.col("is_article_page") == True)
.withColumn("page_namespace", F.lit(0))
.withColumn("recommendation_type", F.lit('image'))
.withColumn("year", F.lit(year))
.withColumn("month", F.lit(month))
.withColumn("day", F.lit(day))
.withColumnRenamed("wiki", "wikiid")
.withColumn("page_id", df.page_id.cast('int'))
.write.partitionBy("year", "month", "day")
.mode("overwrite") # Requires dynamic partitioning enabled
%% Cell type:code id: tags:
``` python
from wmfdata.spark import get_session
%% Cell type:code id: tags:
``` python
# We use wmfdata boilerplate to init a spark session.