Commit 4cec0127 authored by Gmodena's avatar Gmodena
Browse files

run IMA data pipeline on an-airflow.

parent 3dc52f1f
......@@ -121,3 +121,5 @@ venv.bak/
# Makefile
# idea
image: "continuumio/miniconda3"
- build
- test
- publish
# TODO(gmodena): cross-project artifacts download
# are not available in Gitlab Freemium.
# stage: build-ima
# script:
# - ls -lhR
# needs:
# - project: gmodena/ImageMatching
# job: build
# ref: add-gitlab-ci
# artifacts: true
# curl is not installed by default on continuumio/miniconda3.
- conda install -y make curl
# Install IMA dependencies in a virtual environment
# and publish it as a build artifact.
# TODO(gmodena): venvs produced by conda-pack and venv-pack
# exceed the artifact size limit provided by gitlab.
# Currently env and dep building is delegated to the "deploy"
# host. In our case, the developer workstation where
# `make deploy` is executed.
# stage: build
# script:
# - make -C image-matching venv
# artifacts:
# paths:
# - image-matching/venv.tar.gz
# expire_in: "1 hour"
# Install Apache Spark in the test image.
# TODO(gmodena): we should have a polyglot Docker image with jdk8, python3,
# and spark pre-installed
stage: test
- make test SKIP_DOCKER=true
stage: publish
- make archive
- 'curl -v --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file /tmp/platform-airflow-dags.tar.gz "${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/platform-airflow-dags/${CI_COMMIT_REF_NAME}-${CI_COMMIT_SHORT_SHA}/platform-airflow-dags.tar.gz"'
branch := $(shell git rev-parse --abbrev-ref HEAD)
short_commit_hash := $(shell git rev-parse --short=8 HEAD)
airflow_host := an-airflow1003.eqiad.wmnet
airflow_user := analytics-platform-eng
airflow_home := /srv/airflow-platform_eng/
environment := dev
gitlab_project_id := 56
gitlab_project_name := platform-airflow-dags
gitlab_package_version := ${branch}-${short_commit_hash}
gitlab_ci_api_root :=
gitlab_package_archive := platform-airflow-dags.tar.gz
platform_airflow_dags_url := ${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/${gitlab_project_name}/${gitlab_package_version}/${gitlab_package_archive}
ima_home := image-matching
ima_venv_archive := venv.tar.gz
# Runs some command to setup DAGs, venvs and project code on an airflow worker.
ssh ${airflow_host} 'sudo -u ${airflow_user} rm -rf ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} mkdir ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} tar xvzf ${airflow_home}/image-matching/venv.tar.gz -C ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'rm ${gitlab_package_archive}'
rm -f ${ima_home}/${ima_venv_archive}
make -C ${ima_home} venv
cd ${ima_home}; make test
archive: ima-venv
tar cvz --exclude=".*" -f ${gitlab_package_archive} .
# Publish an artifact to a Gitlab Generic Archive registry using a private token.
publish: archive
status=$(git status --porcelain)
test "x$(status)" = "x" || echo "Echo Working directory is dirty. Aborting."
#curl -v --header "PRIVATE-TOKEN: ${GITLAB_PRIVATE_TOKEN}" --upload-file /tmp/platform-airflow-dags.tar.gz "${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/platform-airflow-dags/${branch}-${short_commit_hash}/${gitlab_package_archive}"
# Test, assemble venvs, generate an archive locally and ship it to the airflow worker.
deploy-local-build: test archive
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Delegate CI to gitlab, and ship a successfully built artifact to the airflow worker.
curl --fail -o ${gitlab_package_archive} ${platform_airflow_dags_url}
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Authors
# Clara Andrew-Wani 2021 (
from datetime import timedelta
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
......@@ -24,7 +24,7 @@ default_args = {
"retry_delay": timedelta(minutes=5),
"start_date": days_ago(1),
"catchup": True,
"schedule_interval": "@once",
"schedule_interval": None,
with DAG(
......@@ -37,16 +37,16 @@ with DAG(
image_suggestion_dir = os.environ.get("IMAGE_SUGGESTION_DIR", f'/srv/airflow-platform_eng/image-matching/')
# TODO: Undo hardcode, use airflow generated run id
run_id = '8419345a-3404-4a7c-93e1-b9e6813706ff'
snapshot = '2021-07-26'
monthly_snapshot = '2021-07'
snapshot = "{{ dag_run.conf['snapshot'] or '2021-09-08' }}"
monthly_snapshot = datetime.fromisoformat(snapshot).strftime('%Y-%m')
username = getpass.getuser()
hive_user_db = 'analytics_platform_eng'
config = configparser.ConfigParser()
ima_home = '/srv/airflow-platform_eng/image-matching'
# wikis = config.get("poc_wikis", "target_wikis")
# wikis = wikis.split()
wikis = ['kowiki', ]
wikis = ['kowiki', 'plwiki']
# Create directories for pipeline
algo_outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/Output')
......@@ -73,13 +73,13 @@ with DAG(
# TODO: Look into SparkSubmitOperator
generate_placeholder_images = BashOperator(
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON=python spark2-submit --archives {image_suggestion_dir}/venv.tar.gz#venv --properties-file {spark_config} {image_suggestion_dir}/python/'
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file /srv/airflow-platform_eng/image-matching/runs/{run_id}/ --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/ {snapshot}'
# Update hive external table metadata
update_imagerec_table = BashOperator(
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec.hql'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec.hql'
......@@ -87,7 +87,7 @@ with DAG(
for wiki in wikis:
algo_run = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} {image_suggestion_dir}/python/'
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file {ima_home}/runs/{run_id}/ --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/ {snapshot} {wiki}'
# Sensor for finished algo run
......@@ -106,8 +106,8 @@ with DAG(
upload_imagerec_to_hdfs = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} --master {spark_master_local} \
--files {image_suggestion_dir}/etl/ \
{image_suggestion_dir}/etl/ \
--files {image_suggestion_dir}/spark/ \
{image_suggestion_dir}/spark/ \
--wiki {wiki} \
--snapshot {monthly_snapshot} \
--source file://{algo_outputdir}/{wiki}_{snapshot}_wd_image_candidates.tsv \
......@@ -115,16 +115,14 @@ with DAG(
# Link tasks
generate_spark_config >> generate_placeholder_images >> algo_run
algo_run >> raw_dataset_sensor
raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
generate_spark_config >> generate_placeholder_images >> algo_run >> raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
# Generate production data
hdfs_imagerec_prod = f'/user/{username}/imagerec_prod'
generate_production_data = BashOperator(
bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/etl/ \
{image_suggestion_dir}/etl/ \
bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/spark/ \
{image_suggestion_dir}/spark/ \
--snapshot {monthly_snapshot} \
--source {hdfs_imagerec} \
--destination {hdfs_imagerec_prod} \
......@@ -134,7 +132,7 @@ with DAG(
# Update hive external production metadata
update_imagerec_prod_table = BashOperator(
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec_prod.hql'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec_prod.hql'
for wiki in wikis:
......@@ -142,7 +140,7 @@ with DAG(
# Export production datasets
export_prod_data = BashOperator(
bash_command=f'hive -hiveconf username={username} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/ddl/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/sql/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
# Sensor for production data
SHELL := /bin/bash
venv := venv
venv_archive_format := tar.gz
pip_requirements := requirements.txt
pip_requirements_test := requirements-test.txt
conda_python_version = 3.7
pyspark_version = 2.4.5
extra_pypi :=
CONDA_CMD := conda config --set pip_interop_enabled True; conda create -n ${venv} python=${conda_python_version}; conda init bash; source ~/.bashrc && conda activate ${venv}
ifneq ($(SKIP_DOCKER),true)
DOCKER_IMG := continuumio/miniconda3
DOCKER_CMD := docker run -it \
--rm \
-v /Users/gmodena/repos/gitlab/platform-airflow-dags/image-matching:/root \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install --extra-index-url ${extra_pypi} -r ${pip_requirements}; \
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
test: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
conda install openjdk pyspark==${pyspark_version}; \
pip install -r ${pip_requirements_test}; \
PYTHONPATH=${PYTHONPATH}:spark/ pytest --cov spark tests/"
......@@ -2,20 +2,23 @@
Training and dataset publishing pipeline for the [Image Suggestion]( service.
Airflow DAG for model training and etl.
Data pipeline for model training and etl.
## Content
- `dags` contains the airflow dags for this workflow.
- `conf` contains job specific config files.
- `spark` contains Spark based data processing tasks.
- `sbin` contains python scripts for data processing tasks.
- `sql` contains SQL/HQL based data processing tasks.
- `test` contains a test suite
## Test
Test in a Docker container
make test
python3 -m venv venv
source venv/bin/activate
pip install -r requirements-test.txt
PYTHONPATH=spark python3 -m test
Test on nativ system:
make test SKIP_DOCKER=true
spark.master yarn
spark.submit.deployMode cluster
spark.submit.deployMode client
# Cluster topology for regular sized jobs (15% resource utilisation)
spark.driver.memory 2g
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
import re
import pickle
import pandas as pd
import math
import numpy as np
import random
import requests
#from bs4 import BeautifulSoup
import json
import os
from wmfdata.spark import get_session
# In[ ]:
get_ipython().system('which python')
# In[ ]:
# In[ ]:
# Pass in directory to place output files
output_dir = 'Output'
if not os.path.exists(output_dir):
# Pass in the full snapshot date
snapshot = '2021-07-26'
# Allow the passing of a single language as a parameter
language = 'kowiki'
# A spark session type determines the resource pool
# to initialise on yarn
spark_session_type = 'regular'
# Name of placeholder images parquet file
image_placeholders_file = 'image_placeholders'
# In[ ]:
# We use wmfdata boilerplate to init a spark session.
# Under the hood the library uses findspark to initialise
# Spark's environment. pyspark imports will be available
# after initialisation
spark = get_session(type='regular', app_name="ImageRec-DEV Training")
import pyspark
import pyspark.sql
# In[ ]:
languages=['enwiki','arwiki','kowiki','cswiki','viwiki','frwiki','fawiki','ptwiki','ruwiki','trwiki','plwiki','hewiki','svwiki','ukwiki','huwiki','hywiki','srwiki','euwiki','arzwiki','cebwiki','dewiki','bnwiki'] #language editions to consider
#val=100 #threshold above which we consider images as non-icons
# In[ ]:
reg = r'^([\w]+-[\w]+)'
short_snapshot = re.match(reg, snapshot).group()
# In[ ]:
reg = r'.+?(?=wiki)'
label_lang = re.match(reg, language).group()
# In[ ]:
get_ipython().system('ls /home/gmodena/ImageMatching/conf/')
# In[ ]:
# In[ ]:
image_placeholders =
# In[ ]:
def get_threshold(wiki_size):
#change th to optimize precision vs recall. recommended val for accuracy = 5
sze, th, lim = 50000, 15, 4
if (wiki_size >= sze):
#if wiki_size > base size, scale threshold by (log of ws/bs) + 1
return (math.log(wiki_size/sze, 10)+1)*th
#else scale th down by ratio bs/ws, w min possible val of th = th/limiting val
return max((wiki_size/sze) * th, th/lim)
# In[ ]:
for wiki in languages:
querytot="""SELECT COUNT(*) as c
FROM wmf_raw.mediawiki_page
WHERE page_namespace=0
AND page_is_redirect=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'"""
wikisize = spark.sql(querytot).toPandas()
# In[ ]:
# In[ ]:
# In[ ]:
# The query below retrieves, for each unillustrated article: its Wikidata ID, the image of the Wikidata ID (if any), the Commons category of the Wikidata ID (if any), and the lead images of the articles in other languages (if any).
# `allowed_images` contains the list of icons (images appearing in more than `val` articles)
# `image_pageids` contains the list of illustrated articles (articles with images that are not icons)
# `noimage_pages` contains the pageid and Qid of unillustrated articles
# `qid_props` contains for each Qid in `noimage_pages`, the values of the following properties, when present:
# * P18: the item's image
# * P373: the item's Commons category
# * P31: the item's "instance of" property
# `category_image_list` contains the list of all images in a Commons category in `qid_props`
# `lan_page_images` contains the list of lead images in Wikipedia articles in all languages linked to each Qid
# `qid_props_with_image_list` is qid_props plus the list of images in the Commons category linked to the Wikidata item
# In[ ]:
for wiki in languages:
queryd="""WITH allowed_images AS
SELECT il_to
FROM wmf_raw.mediawiki_imagelinks
WHERE il_from_namespace=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'
AND il_to not like '%\"%' AND il_to not like '%,%'
GROUP BY il_to
HAVING COUNT(il_to)>"""+str(val[wiki])+"""),
image_pageids AS
(SELECT DISTINCT il_from as pageid
FROM wmf_raw.mediawiki_imagelinks il1
LEFT ANTI JOIN allowed_images
ON allowed_images.il_to=il1.il_to
WHERE il1.il_from_namespace=0
AND il1.wiki_db='"""+wiki+"""'
AND il1.snapshot='"""+short_snapshot+"""'
pageimage_pageids AS
SELECT DISTINCT pp_page as pageid
FROM wmf_raw.mediawiki_page_props pp
WHERE pp.wiki_db ='"""+wiki+"""'
AND pp.snapshot='"""+short_snapshot+"""'
AND pp_propname in ('page_image','page_image_free')),
all_image_pageids as(
SELECT pageid
FROM image_pageids
SELECT pageid
FROM pageimage_pageids
noimage_pages as
SELECT wipl.item_id,p.page_id,p.page_title,page_len
FROM wmf_raw.mediawiki_page p
JOIN wmf.wikidata_item_page_link wipl
ON p.page_id=wipl.page_id
LEFT ANTI JOIN all_image_pageids
on all_image_pageids.pageid=wipl.page_id
WHERE p.page_namespace=0
AND page_is_redirect=0 AND p.wiki_db='"""+wiki+"""'
AND p.snapshot='"""+short_snapshot+"""'
AND wipl.snapshot='"""+snapshot+"""'
AND wipl.page_namespace=0
AND wipl.wiki_db='"""+wiki+"""'
ORDER BY page_len desc
qid_props AS
MAX(CASE WHEN = 'P18' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS hasimage,
MAX(CASE WHEN = 'P373' THEN REPLACE(REPLACE(claim.mainSnak.datavalue.value,'\"',''),' ','_') ELSE NULL END) AS commonscategory,
MAX(CASE WHEN = 'P31' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS instanceof
FROM wmf.wikidata_entity we
JOIN noimage_pages
LATERAL VIEW explode(labels) t AS label_lang,label_val
LATERAL VIEW OUTER explode(claims) c AS claim
WHERE typ='item'
AND t.label_lang='"""+label_lang+"""'
AND snapshot='"""+snapshot+"""'
AND in ('P18','P31','P373')
GROUP BY id,label_val
category_image_list AS
SELECT cl_to,concat_ws(';',collect_list(mp.page_title)) as category_imagelist
from qid_props
left join wmf_raw.mediawiki_categorylinks mc
on qid_props.commonscategory=mc.cl_to
join wmf_raw.mediawiki_page mp
on mp.page_id=mc.cl_from
LEFT ANTI JOIN image_placeholders
on image_placeholders.page_title = mp.page_title
WHERE mp.wiki_db ='commonswiki'
AND mp.snapshot='"""+short_snapshot+"""'
AND mp.page_namespace=6
AND mp.page_is_redirect=0
AND mc.snapshot='"""+short_snapshot+"""'
AND mc.wiki_db ='commonswiki'
AND mc.cl_type='file'
group by mc.cl_to
qid_props_with_image_list AS
SELECT id, label_val, hasimage, commonscategory, instanceof,category_imagelist
from qid_props
left join category_image_list
on qid_props.commonscategory=category_image_list.cl_to
lan_page_images AS
SELECT nip.item_id,nip.page_id,nip.page_title,nip.page_len,collect_list(concat(pp.wiki_db,': ',pp.pp_value)) as lan_images
FROM noimage_pages nip
LEFT JOIN wmf.wikidata_item_page_link wipl
LEFT JOIN wmf_raw.mediawiki_page_props pp
LEFT JOIN wmf_raw.mediawiki_page mp
ON nip.item_id=wipl.item_id
AND wipl.page_id=pp.pp_page
AND wipl.wiki_db=pp.wiki_db
AND mp.page_title=pp.pp_value
LEFT ANTI JOIN image_placeholders
ON image_placeholders.page_title = pp.pp_value
WHERE wipl.wiki_db !='"""+wiki+"""'
AND wipl.snapshot='"""+snapshot+"""'
AND wipl.page_namespace=0
AND pp.snapshot='"""+short_snapshot+"""'
AND pp_propname in ('page_image','page_image_free')
AND mp.wiki_db ='commonswiki'
AND mp.snapshot='"""+short_snapshot+"""'
AND mp.page_namespace=6
AND mp.page_is_redirect=0
GROUP BY nip.item_id,nip.page_id,nip.page_title,nip.page_len
joined_lan_page_images AS
SELECT nip.item_id,nip.page_id,nip.page_title,nip.page_len, lpi.lan_images
from noimage_pages nip
LEFT JOIN lan_page_images lpi
on nip.item_id=lpi.item_id
SELECT * from joined_lan_page_images
LEFT JOIN qid_props_with_image_list
qid_props = spark.sql(queryd).toPandas()
# Below I am just creating different tables according to whether an image is retrieved from a specific source (Wikidata image, Commons Category, or interlingual links)
# In[ ]:
for wiki in languages: