Commit b8a8a95e authored by Gmodena's avatar Gmodena
Browse files

Merge branch 'update-ima-project-wip' into 'multi-project-dags-repo'

run IMA data pipeline on an-airflow.

See merge request !3
parents 3dc52f1f 4cec0127
Pipeline #1061 failed with stages
in 1 minute and 24 seconds
...@@ -121,3 +121,5 @@ venv.bak/ ...@@ -121,3 +121,5 @@ venv.bak/
# Makefile # Makefile
.Makefile .Makefile
# idea
./idea
image: "continuumio/miniconda3"
stages:
- build
- test
- publish
# TODO(gmodena): cross-project artifacts download
# are not available in Gitlab Freemium.
#build-ima-upstream:
# stage: build-ima
# script:
# - ls -lhR
# needs:
# - project: gmodena/ImageMatching
# job: build
# ref: add-gitlab-ci
# artifacts: true
# curl is not installed by default on continuumio/miniconda3.
before_script:
- conda install -y make curl
# Install IMA dependencies in a virtual environment
# and publish it as a build artifact.
# TODO(gmodena): venvs produced by conda-pack and venv-pack
# exceed the artifact size limit provided by gitlab.
# Currently env and dep building is delegated to the "deploy"
# host. In our case, the developer workstation where
# `make deploy` is executed.
#build-ima-deps:
# stage: build
# script:
# - make -C image-matching venv
# artifacts:
# paths:
# - image-matching/venv.tar.gz
# expire_in: "1 hour"
# Install Apache Spark in the test image.
# TODO(gmodena): we should have a polyglot Docker image with jdk8, python3,
# and spark pre-installed
test-ima:
stage: test
script:
- make test SKIP_DOCKER=true
publish-dags:
stage: publish
script:
- make archive
- 'curl -v --header "JOB-TOKEN: $CI_JOB_TOKEN" --upload-file /tmp/platform-airflow-dags.tar.gz "${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/platform-airflow-dags/${CI_COMMIT_REF_NAME}-${CI_COMMIT_SHORT_SHA}/platform-airflow-dags.tar.gz"'
branch := $(shell git rev-parse --abbrev-ref HEAD)
short_commit_hash := $(shell git rev-parse --short=8 HEAD)
airflow_host := an-airflow1003.eqiad.wmnet
airflow_user := analytics-platform-eng
airflow_home := /srv/airflow-platform_eng/
environment := dev
gitlab_project_id := 56
gitlab_project_name := platform-airflow-dags
gitlab_package_version := ${branch}-${short_commit_hash}
gitlab_ci_api_root := https://gitlab.wikimedia.org/api/v4
gitlab_package_archive := platform-airflow-dags.tar.gz
platform_airflow_dags_url := ${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/${gitlab_project_name}/${gitlab_package_version}/${gitlab_package_archive}
ima_home := image-matching
ima_venv_archive := venv.tar.gz
# Runs some command to setup DAGs, venvs and project code on an airflow worker.
install-dags:
ssh ${airflow_host} 'sudo -u ${airflow_user} rm -rf ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} mkdir ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'sudo -u ${airflow_user} tar xvzf ${airflow_home}/image-matching/venv.tar.gz -C ${airflow_home}/image-matching/venv'
ssh ${airflow_host} 'rm ${gitlab_package_archive}'
ima-venv:
rm -f ${ima_home}/${ima_venv_archive}
make -C ${ima_home} venv
test:
cd ${ima_home}; make test
archive: ima-venv
tar cvz --exclude=".*" -f ${gitlab_package_archive} .
# Publish an artifact to a Gitlab Generic Archive registry using a private token.
publish: archive
status=$(git status --porcelain)
test "x$(status)" = "x" || echo "Echo Working directory is dirty. Aborting."
#curl -v --header "PRIVATE-TOKEN: ${GITLAB_PRIVATE_TOKEN}" --upload-file /tmp/platform-airflow-dags.tar.gz "${gitlab_ci_api_root}/projects/${gitlab_project_id}/packages/generic/platform-airflow-dags/${branch}-${short_commit_hash}/${gitlab_package_archive}"
# Test, assemble venvs, generate an archive locally and ship it to the airflow worker.
deploy-local-build: test archive
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Delegate CI to gitlab, and ship a successfully built artifact to the airflow worker.
deploy-gitlab-build:
curl --fail -o ${gitlab_package_archive} ${platform_airflow_dags_url}
scp ${gitlab_package_archive} ${airflow_host}:
make install-dags
# Authors # Authors
# Clara Andrew-Wani 2021 (https://github.com/clarakosi/ImageMatching/blob/airflow/etl.py). # Clara Andrew-Wani 2021 (https://github.com/clarakosi/ImageMatching/blob/airflow/etl.py).
from datetime import timedelta from datetime import timedelta, datetime
from airflow import DAG from airflow import DAG
from airflow.operators.bash import BashOperator from airflow.operators.bash import BashOperator
...@@ -24,7 +24,7 @@ default_args = { ...@@ -24,7 +24,7 @@ default_args = {
"retry_delay": timedelta(minutes=5), "retry_delay": timedelta(minutes=5),
"start_date": days_ago(1), "start_date": days_ago(1),
"catchup": True, "catchup": True,
"schedule_interval": "@once", "schedule_interval": None,
} }
with DAG( with DAG(
...@@ -37,16 +37,16 @@ with DAG( ...@@ -37,16 +37,16 @@ with DAG(
image_suggestion_dir = os.environ.get("IMAGE_SUGGESTION_DIR", f'/srv/airflow-platform_eng/image-matching/') image_suggestion_dir = os.environ.get("IMAGE_SUGGESTION_DIR", f'/srv/airflow-platform_eng/image-matching/')
# TODO: Undo hardcode, use airflow generated run id # TODO: Undo hardcode, use airflow generated run id
run_id = '8419345a-3404-4a7c-93e1-b9e6813706ff' run_id = '8419345a-3404-4a7c-93e1-b9e6813706ff'
print(run_id) snapshot = "{{ dag_run.conf['snapshot'] or '2021-09-08' }}"
snapshot = '2021-07-26' monthly_snapshot = datetime.fromisoformat(snapshot).strftime('%Y-%m')
monthly_snapshot = '2021-07'
username = getpass.getuser() username = getpass.getuser()
hive_user_db = 'analytics_platform_eng'
config = configparser.ConfigParser() config = configparser.ConfigParser()
ima_home = '/srv/airflow-platform_eng/image-matching'
# config.read(f'{image_suggestion_dir}/conf/wiki.conf') # config.read(f'{image_suggestion_dir}/conf/wiki.conf')
# wikis = config.get("poc_wikis", "target_wikis") # wikis = config.get("poc_wikis", "target_wikis")
# wikis = wikis.split() # wikis = wikis.split()
wikis = ['kowiki', ] wikis = ['kowiki', 'plwiki']
# Create directories for pipeline # Create directories for pipeline
algo_outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/Output') algo_outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/Output')
...@@ -73,13 +73,13 @@ with DAG( ...@@ -73,13 +73,13 @@ with DAG(
# TODO: Look into SparkSubmitOperator # TODO: Look into SparkSubmitOperator
generate_placeholder_images = BashOperator( generate_placeholder_images = BashOperator(
task_id='generate_placeholder_images', task_id='generate_placeholder_images',
bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON=python spark2-submit --archives {image_suggestion_dir}/venv.tar.gz#venv --properties-file {spark_config} {image_suggestion_dir}/python/algorithm.py' bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file /srv/airflow-platform_eng/image-matching/runs/{run_id}/regular.spark.properties --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/placeholder_images.py {snapshot}'
) )
# Update hive external table metadata # Update hive external table metadata
update_imagerec_table = BashOperator( update_imagerec_table = BashOperator(
task_id='update_imagerec_table', task_id='update_imagerec_table',
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec.hql' bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec.hql'
) )
...@@ -87,7 +87,7 @@ with DAG( ...@@ -87,7 +87,7 @@ with DAG(
for wiki in wikis: for wiki in wikis:
algo_run = BashOperator( algo_run = BashOperator(
task_id=f'run_algorithm_for_{wiki}', task_id=f'run_algorithm_for_{wiki}',
bash_command=f'spark2-submit --properties-file {spark_config} {image_suggestion_dir}/python/algorithm.py' bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file {ima_home}/runs/{run_id}/regular.spark.properties --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/algorithm.py {snapshot} {wiki}'
) )
# Sensor for finished algo run # Sensor for finished algo run
...@@ -106,8 +106,8 @@ with DAG( ...@@ -106,8 +106,8 @@ with DAG(
upload_imagerec_to_hdfs = BashOperator( upload_imagerec_to_hdfs = BashOperator(
task_id=f'upload_{wiki}_imagerec_to_hdfs', task_id=f'upload_{wiki}_imagerec_to_hdfs',
bash_command=f'spark2-submit --properties-file {spark_config} --master {spark_master_local} \ bash_command=f'spark2-submit --properties-file {spark_config} --master {spark_master_local} \
--files {image_suggestion_dir}/etl/schema.py \ --files {image_suggestion_dir}/spark/schema.py \
{image_suggestion_dir}/etl/raw2parquet.py \ {image_suggestion_dir}/spark/raw2parquet.py \
--wiki {wiki} \ --wiki {wiki} \
--snapshot {monthly_snapshot} \ --snapshot {monthly_snapshot} \
--source file://{algo_outputdir}/{wiki}_{snapshot}_wd_image_candidates.tsv \ --source file://{algo_outputdir}/{wiki}_{snapshot}_wd_image_candidates.tsv \
...@@ -115,16 +115,14 @@ with DAG( ...@@ -115,16 +115,14 @@ with DAG(
) )
# Link tasks # Link tasks
generate_spark_config >> generate_placeholder_images >> algo_run generate_spark_config >> generate_placeholder_images >> algo_run >> raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
algo_run >> raw_dataset_sensor
raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
# Generate production data # Generate production data
hdfs_imagerec_prod = f'/user/{username}/imagerec_prod' hdfs_imagerec_prod = f'/user/{username}/imagerec_prod'
generate_production_data = BashOperator( generate_production_data = BashOperator(
task_id='generate_production_data', task_id='generate_production_data',
bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/etl/schema.py \ bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/spark/schema.py \
{image_suggestion_dir}/etl/transform.py \ {image_suggestion_dir}/spark/transform.py \
--snapshot {monthly_snapshot} \ --snapshot {monthly_snapshot} \
--source {hdfs_imagerec} \ --source {hdfs_imagerec} \
--destination {hdfs_imagerec_prod} \ --destination {hdfs_imagerec_prod} \
...@@ -134,7 +132,7 @@ with DAG( ...@@ -134,7 +132,7 @@ with DAG(
# Update hive external production metadata # Update hive external production metadata
update_imagerec_prod_table = BashOperator( update_imagerec_prod_table = BashOperator(
task_id='update_imagerec_prod_table', task_id='update_imagerec_prod_table',
bash_command=f'hive -hiveconf username={username} -f {image_suggestion_dir}/ddl/external_imagerec_prod.hql' bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec_prod.hql'
) )
for wiki in wikis: for wiki in wikis:
...@@ -142,7 +140,7 @@ with DAG( ...@@ -142,7 +140,7 @@ with DAG(
# Export production datasets # Export production datasets
export_prod_data = BashOperator( export_prod_data = BashOperator(
task_id=f'export_{wiki}_prod_data', task_id=f'export_{wiki}_prod_data',
bash_command=f'hive -hiveconf username={username} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/ddl/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header' bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/sql/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
) )
# Sensor for production data # Sensor for production data
......
SHELL := /bin/bash
venv := venv
venv_archive_format := tar.gz
pip_requirements := requirements.txt
pip_requirements_test := requirements-test.txt
conda_python_version = 3.7
pyspark_version = 2.4.5
extra_pypi := https://gitlab.wikimedia.org/api/v4/projects/40/packages/pypi/simple
CONDA_CMD := conda config --set pip_interop_enabled True; conda create -n ${venv} python=${conda_python_version}; conda init bash; source ~/.bashrc && conda activate ${venv}
ifneq ($(SKIP_DOCKER),true)
DOCKER_IMG := continuumio/miniconda3
DOCKER_CMD := docker run -it \
--rm \
-v /Users/gmodena/repos/gitlab/platform-airflow-dags/image-matching:/root \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
endif
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install --extra-index-url ${extra_pypi} -r ${pip_requirements}; \
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
test: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
conda install openjdk pyspark==${pyspark_version}; \
pip install -r ${pip_requirements_test}; \
PYTHONPATH=${PYTHONPATH}:spark/ pytest --cov spark tests/"
...@@ -2,20 +2,23 @@ ...@@ -2,20 +2,23 @@
Training and dataset publishing pipeline for the [Image Suggestion](https://phabricator.wikimedia.org/project/profile/5171/) service. Training and dataset publishing pipeline for the [Image Suggestion](https://phabricator.wikimedia.org/project/profile/5171/) service.
Airflow DAG for model training and etl. Data pipeline for model training and etl.
## Content ## Content
- `dags` contains the airflow dags for this workflow. - `conf` contains job specific config files.
- `spark` contains Spark based data processing tasks. - `spark` contains Spark based data processing tasks.
- `sbin` contains python scripts for data processing tasks.
- `sql` contains SQL/HQL based data processing tasks. - `sql` contains SQL/HQL based data processing tasks.
- `test` contains a test suite
## Test ## Test
Test in a Docker container
```shell
make test
``` ```
python3 -m venv venv
source venv/bin/activate Test on nativ system:
pip install -r requirements-test.txt ```shell
PYTHONPATH=spark python3 -m test make test SKIP_DOCKER=true
``` ```
spark.master yarn spark.master yarn
spark.submit.deployMode cluster spark.submit.deployMode client
# Cluster topology for regular sized jobs (15% resource utilisation) # Cluster topology for regular sized jobs (15% resource utilisation)
# https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Spark_Resource_Settings # https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Spark_Resource_Settings
spark.driver.memory 2g spark.driver.memory 2g
......
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
import re
import pickle
import pandas as pd
import math
import numpy as np
import random
import requests
#from bs4 import BeautifulSoup
import json
import os
from wmfdata.spark import get_session
# In[ ]:
get_ipython().system('which python')
# In[ ]:
qids_and_properties={}
# In[ ]:
# Pass in directory to place output files
output_dir = 'Output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Pass in the full snapshot date
snapshot = '2021-07-26'
# Allow the passing of a single language as a parameter
language = 'kowiki'
# A spark session type determines the resource pool
# to initialise on yarn
spark_session_type = 'regular'
# Name of placeholder images parquet file
image_placeholders_file = 'image_placeholders'
# In[ ]:
# We use wmfdata boilerplate to init a spark session.
# Under the hood the library uses findspark to initialise
# Spark's environment. pyspark imports will be available
# after initialisation
spark = get_session(type='regular', app_name="ImageRec-DEV Training")
import pyspark
import pyspark.sql
# In[ ]:
languages=['enwiki','arwiki','kowiki','cswiki','viwiki','frwiki','fawiki','ptwiki','ruwiki','trwiki','plwiki','hewiki','svwiki','ukwiki','huwiki','hywiki','srwiki','euwiki','arzwiki','cebwiki','dewiki','bnwiki'] #language editions to consider
#val=100 #threshold above which we consider images as non-icons
languages=[language]
# In[ ]:
reg = r'^([\w]+-[\w]+)'
short_snapshot = re.match(reg, snapshot).group()
short_snapshot
# In[ ]:
reg = r'.+?(?=wiki)'
label_lang = re.match(reg, language).group()
label_lang
# In[ ]:
get_ipython().system('ls /home/gmodena/ImageMatching/conf/metrics.properties')
# In[ ]:
len(languages)
# In[ ]:
image_placeholders = spark.read.parquet(image_placeholders_file)
image_placeholders.createOrReplaceTempView("image_placeholders")
# In[ ]:
def get_threshold(wiki_size):
#change th to optimize precision vs recall. recommended val for accuracy = 5
sze, th, lim = 50000, 15, 4
if (wiki_size >= sze):
#if wiki_size > base size, scale threshold by (log of ws/bs) + 1
return (math.log(wiki_size/sze, 10)+1)*th
#else scale th down by ratio bs/ws, w min possible val of th = th/limiting val
return max((wiki_size/sze) * th, th/lim)
# In[ ]:
val={}
total={}
for wiki in languages:
querytot="""SELECT COUNT(*) as c
FROM wmf_raw.mediawiki_page
WHERE page_namespace=0
AND page_is_redirect=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'"""
wikisize = spark.sql(querytot).toPandas()
val[wiki]=get_threshold(int(wikisize['c']))
total[wiki]=int(wikisize['c'])
# In[ ]:
val
# In[ ]:
total
# In[ ]:
wikisize
# The query below retrieves, for each unillustrated article: its Wikidata ID, the image of the Wikidata ID (if any), the Commons category of the Wikidata ID (if any), and the lead images of the articles in other languages (if any).
#
# `allowed_images` contains the list of icons (images appearing in more than `val` articles)
#
# `image_pageids` contains the list of illustrated articles (articles with images that are not icons)
#
# `noimage_pages` contains the pageid and Qid of unillustrated articles
#
# `qid_props` contains for each Qid in `noimage_pages`, the values of the following properties, when present:
# * P18: the item's image
# * P373: the item's Commons category
# * P31: the item's "instance of" property
#
# `category_image_list` contains the list of all images in a Commons category in `qid_props`
#
# `lan_page_images` contains the list of lead images in Wikipedia articles in all languages linked to each Qid
#
# `qid_props_with_image_list` is qid_props plus the list of images in the Commons category linked to the Wikidata item
#
#
# In[ ]:
for wiki in languages:
print(wiki)
queryd="""WITH allowed_images AS
(
SELECT il_to
FROM wmf_raw.mediawiki_imagelinks
WHERE il_from_namespace=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'
AND il_to not like '%\"%' AND il_to not like '%,%'
GROUP BY il_to
HAVING COUNT(il_to)>"""+str(val[wiki])+"""),
image_pageids AS
(SELECT DISTINCT il_from as pageid
FROM wmf_raw.mediawiki_imagelinks il1
LEFT ANTI JOIN allowed_images
ON allowed_images.il_to=il1.il_to
WHERE il1.il_from_namespace=0
AND il1.wiki_db='"""+wiki+"""'
AND il1.snapshot='"""+short_snapshot+"""'
),
pageimage_pageids AS
(
SELECT DISTINCT pp_page as pageid
FROM wmf_raw.mediawiki_page_props pp
WHERE pp.wiki_db ='"""+wiki+"""'
AND pp.snapshot='"""+short_snapshot+"""'
AND pp_propname in ('page_image','page_image_free')),
all_image_pageids as(
SELECT pageid
FROM image_pageids
UNION
SELECT pageid
FROM pageimage_pageids
),
noimage_pages as
(
SELECT wipl.item_id,p.page_id,p.page_title,page_len
FROM wmf_raw.mediawiki_page p
JOIN wmf.wikidata_item_page_link wipl
ON p.page_id=wipl.page_id
LEFT ANTI JOIN all_image_pageids
on all_image_pageids.pageid=wipl.page_id
WHERE p.page_namespace=0
AND page_is_redirect=0 AND p.wiki_db='"""+wiki+"""'
AND p.snapshot='"""+short_snapshot+"""'
AND wipl.snapshot='"""+snapshot+"""'
AND wipl.page_namespace=0
AND wipl.wiki_db='"""+wiki+"""'
ORDER BY page_len desc
),
qid_props AS
(
SELECT we.id,label_val,
MAX(CASE WHEN claim.mainSnak.property = 'P18' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS hasimage,
MAX(CASE WHEN claim.mainSnak.property = 'P373' THEN REPLACE(REPLACE(claim.mainSnak.datavalue.value,'\"',''),' ','_') ELSE NULL END) AS commonscategory,
MAX(CASE WHEN claim.mainSnak.property = 'P31' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS instanceof
FROM wmf.wikidata_entity we
JOIN noimage_pages
ON we.id=noimage_pages.item_id
LATERAL VIEW explode(labels) t AS label_lang,label_val
LATERAL VIEW OUTER explode(claims) c AS claim
WHERE typ='item'
AND t.label_lang='"""+label_lang+"""'
AND snapshot='"""+snapshot+"""'
AND claim.mainSnak.property in ('P18','P31','P373')
GROUP BY id,label_val
),
category_image_list AS
(
SELECT cl_to,concat_ws(';',collect_list(mp.page_title)) as category_imagelist
from qid_props
left join wmf_raw.mediawiki_categorylinks mc
on qid_props.commonscategory=mc.cl_to
join wmf_raw.mediawiki_page mp
on mp.page_id=mc.cl_from
LEFT ANTI JOIN image_placeholders
on image_placeholders.page_title = mp.page_title
WHERE mp.wiki_db ='commonswiki'
AND mp.snapshot='"""+short_snapshot+"""'
AND mp.page_namespace=6