Commit 05888e6a authored by Gmodena's avatar Gmodena Committed by GitHub
Browse files

T275685 generate production datasets (#7)

* Add script to generate and export production datasets

* Move hql script to ddl

* Document publish.sh

* Add some crude metrics reporting

* Store artifacts and metrics by run identifier

* Fix variable names

* Adjust var names, record timestamps in metrics

* Enable dynamic partitioning

* Add snapshot partition to production dataset

* Fix dir name

* Update publish.sh doc

* Make virtual env before activationg

* Fix: confidence_rating to source mapping

* Add export data summary

* Update validation notebook with regression cases

* Add test for confidence mapping

* Fix. call uuid4 for default dataset_id

* Fix missing coma in column list

* Export NULL values as empty strings.

* Genedate data for all languages

* Update data export changelog

* Update data export changelog: set month to March

* Clean up validation notebook

* Load validation data from hive

* Fix character escaping
parent 2b0c8f63
......@@ -8,5 +8,8 @@ spark.executor.memory 8g
spark.executor.cores 4
spark.sql.shuffle.partitions 256
# Job specific config
spark.sql.sources.partitionOverwriteMode dynamic
# Append spark-defaults.conf from:
# /usr/lib/spark2/conf/spark-defaults.conf
......@@ -22,8 +22,21 @@ def raw_data(spark_session):
"Some page with no suggestion",
None,
"arwiki",
"2020-12"
)
"2020-12",
),
(
"2",
"Q66666",
"523523",
"Some page with 3 suggestions",
'['
' {"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}, '
'{"image": "image3.jpg", "rating": 1, "note": "image was in the Wikidata item"}, '
'{"image": "image4.jpg", "rating": 3.0, "note": "image was found in the Commons category linked in the Wikidata item"}'
']',
"enwiki",
"2020-12",
),
],
RawDataset.schema,
)
-- This script is used to export production datasets,
-- in a format consumable by the APIs.
--
-- Run with:
-- hive -hiveconf username=${username} -hiveconf wiki=${wiki} -hiveconf snapshot=${monthly_snapshot} -f export_prod_data.hql
--
--
-- Format
-- * Include header: yes
-- * Field delimiter: "\t"
-- * Null value for missing recommendations
-- (image_id, confidence_rating, source fields): ""
--
-- Changelog:
-- * 2021-03-08: schema and format freeze.
--
use ${hiveconf:username};
set hivevar:null_value="";
select page_id,
page_title,
nvl(image_id, ${null_value}) as image_id,
nvl(confidence_rating, ${null_value}) as confidence_rating,
nvl(source, ${null_value}) as source,
dataset_id,
insertion_ts,
wiki
from imagerec_prod
where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}'
......@@ -8,11 +8,9 @@
--
-- Execution
-- hive -hiveconf username=<username> -f external_imagerec_prod.hql
USE ${hiveconf:username};
CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`(
`wiki` string,
`page_id` string,
`page_title` string,
`image_id` string,
......@@ -20,6 +18,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`(
`source` string,
`dataset_id` string,
`insertion_ts` float)
PARTITIONED BY (`wiki` string, `snapshot` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
......
......@@ -3,6 +3,7 @@ from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import Column, DataFrame
from pyspark.sql import functions as F
import argparse
import sys
import uuid
import datetime
......@@ -26,9 +27,9 @@ class RawDataset:
class ImageRecommendation:
confidence_rating: Column = (
F.when(F.col("rating").cast(IntegerType()) == 1, F.lit("low"))
F.when(F.col("rating").cast(IntegerType()) == 1, F.lit("high"))
.when(F.col("rating").cast(IntegerType()) == 2, F.lit("medium"))
.when(F.col("rating").cast(IntegerType()) == 3, F.lit("high"))
.when(F.col("rating").cast(IntegerType()) == 3, F.lit("low"))
)
source: Column = (
F.when(
......@@ -91,27 +92,39 @@ class ImageRecommendation:
)
return with_recommendations.union(without_recommendations)
def parse_args():
parser = argparse.ArgumentParser(description='Transform raw algo output to production datasets')
parser.add_argument('--snapshot', help='Montlhy snapshot date (YYYY-MM)')
parser.add_argument('--source', help='Source dataset path')
parser.add_argument('--destination', help='Destination path')
parser.add_argument('--dataset-id', help='Production dataset identifier (optional)', default=str(uuid.uuid4()), dest='dataset_id')
return parser.parse_args()
if __name__ == "__main__":
if len(sys.argv) < 2:
print(
"""Usage: spark-submit transform.py <source csv file> <destination csv file>"""
)
sys.exit(1)
source = sys.argv[1]
destination = sys.argv[2]
args = parse_args()
snapshot = args.snapshot
source = args.source
destination = args.destination
dataset_id = args.dataset_id
df = (
spark.read.options(delimiter="\t", header=False)
.schema(RawDataset.schema)
.csv(source)
)
dataset_id = str(uuid.uuid4())
insertion_ts = datetime.datetime.now().timestamp()
(
ImageRecommendation(df)
.transform()
.withColumn("dataset_id", F.lit(dataset_id))
.withColumn("insertion_ts", F.lit(insertion_ts))
.withColumn("snapshot", F.lit(snapshot))
.sort(F.desc("page_title"))
.write.options(delimiter="\t", header=False)
.partitionBy("wiki", "snapshot")
.mode('overwrite') # Requires dynamic partitioning enabled
.csv(destination)
)
%% Cell type:code id: tags:
``` python
from transform import RawDataset, ImageRecommendation
from wmfdata.spark import get_session
```
%% Cell type:code id: tags:
``` python
# We use wmfdata boilerplate to init a spark session.
# Under the hood the library uses findspark to initialise
# Spark's environment. pyspark imports will be available
# after initialisation
spark = get_session(type='regular', app_name="ImageRec-DEV analysis")
spark = get_session(type='regular', app_name="ImageRec-DEV Validation")
import pyspark
import pyspark.sql
from transform import RawDataset, ImageRecommendation
from pyspark.sql import functions as F
```
%% Cell type:code id: tags:
``` python
df = (
spark.read.options(delimiter="\t", header=False)
.schema(RawDataset.schema)
.csv("raw_output.tsv")
)
df = ImageRecommendation(df).transform()
df = spark.sql('SELECT * from gmodena.imagerec_prod')
```
%% Cell type:markdown id: tags:
## Summary stats
%% Cell type:code id: tags:
``` python
df.describe().show()
df.groupBy('snapshot', 'wiki').count().sort(F.desc('snapshot')).show(truncate=False)
```
%% Cell type:code id: tags:
``` python
for col in df.columns:
df.groupBy(col).count().sort(F.desc("count")).show()
df.groupBy(col).count().sort(F.desc("count")).show(truncate=False)
```
%% Cell type:code id: tags:
``` python
df.printSchema()
```
%% Cell type:code id: tags:
``` python
df.count()
```
%% Cell type:markdown id: tags:
%% Cell type:markdown id: tags:
## Distributions
%% Cell type:markdown id: tags:
TODO: add boxplots
%% Cell type:markdown id: tags:
# Regression
%% Cell type:markdown id: tags:
Track past data issues and potential regression
%% Cell type:markdown id: tags:
1. The dataset contains unillustrated articles with no recommendation
%% Cell type:markdown id: tags:
2. Confidence rating relect image source.
Expected:
* high = wikidata
* medium = wikipedia
* low = commons
%% Cell type:code id: tags:
``` python
df.groupby("confidence_rating", "source").count().show(truncate=False)
```
......
#!/usr/bin/env bash
# Run the ImageRecommendation algo via algorunner.py, and generate production datasets
# for all languages defined in `wikis`.
#
# The intermediate algo output and the production datasets will be stored in HDFS
# and exposed as Hive external tables:
#
# - <username>.imagerec: raw datasets (algo output). Maps to hdfs:///users/<username>/imagerec
# - <username>.imagerec_prod: production datasets. Maps to hdfs:///users/<username>/imagerec_prod
#
# Where <username> is the user currently running the publish.sh script.
#
# Production datasets will be exported locally, in tsv format, under runs/<run_id>/imagerec_prod_${snapshot}.
#
# Each time publish.sh is invoked, it records the following data under runs/<run_id>:
#
# - metrics: a set of timing metrics generated by this script
# - Output: raw model output in tsv format
# - imagerec_prod_${snapshot}: production datasets in tsv format
# - regular.spark.properties: spark properties file for the transform.py job
#
# Each run has an associated, unique, <run_id>. This uuid is propagated to the etl transforms,
# and will populate the `dataset_id` in production datasets. This allows reconciliation of
# a given dataset to the process that generated it.
#
# Usage: ./publish.sh <snapshot>
# Example: ./publish.sh 2021-01-25
snapshot=$1
run_id=$(cat /proc/sys/kernel/random/uuid)
# Target wikis to train ImageMatching on
wikis="aawiki abwiki acewiki adywiki afwiki akwiki alswiki amwiki angwiki anwiki arcwiki arwiki arywiki arzwiki astwiki aswiki atjwiki avkwiki avwiki awawiki aywiki azbwiki azwiki banwiki barwiki bat_smgwiki bawiki bclwiki bewiki bgwiki bhwiki biwiki bjnwiki bmwiki bnwiki bowiki bpywiki brwiki bswiki bugwiki bxrwiki cawiki cdowiki cebwiki cewiki chowiki chrwiki chwiki chywiki ckbwiki cowiki crhwiki crwiki csbwiki cswiki cuwiki cvwiki cywiki dawiki dewiki dinwiki diqwiki donatewiki dsbwiki dtywiki dvwiki dzwiki eewiki elwiki emlwiki enwiki eowiki eswiki etwiki euwiki extwiki fawiki ffwiki fiu_vrowiki fiwiki fjwiki fowiki frpwiki frrwiki frwiki furwiki fywiki gagwiki ganwiki gawiki gcrwiki gdwiki glkwiki glwiki gnwiki gomwiki gorwiki gotwiki guwiki gvwiki hakwiki hawiki hawwiki hewiki hifwiki hiwiki howiki hrwiki hsbwiki htwiki huwiki hywiki hywwiki hzwiki iawiki idwiki iewiki igwiki iiwiki ikwiki ilowiki incubatorwiki inhwiki iowiki iswiki itwiki iuwiki jamwiki jawiki jbowiki jvwiki kaawiki kabwiki kawiki kbdwiki kbpwiki kgwiki kiwiki kjwiki kkwiki klwiki kmwiki knwiki koiwiki kowiki krcwiki krwiki kshwiki kswiki kuwiki kvwiki kwwiki kywiki ladwiki lawiki lbewiki lbwiki lezwiki lfnwiki lgwiki lijwiki liwiki lldwiki lmowiki lnwiki lowiki lrcwiki ltgwiki ltwiki lvwiki maiwiki map_bmswiki mdfwiki mediawikiwiki metawiki mgwiki mhrwiki mhwiki minwiki miwiki mkwiki mlwiki mnwiki mnwwiki mrjwiki mrwiki mswiki mtwiki muswiki mwlwiki myvwiki mywiki mznwiki nahwiki napwiki nawiki nds_nlwiki ndswiki newiki newwiki ngwiki nlwiki nnwiki novwiki nowiki nqowiki nrmwiki nsowiki nvwiki nywiki ocwiki olowiki omwiki orwiki oswiki pagwiki pamwiki papwiki pawiki pcdwiki pdcwiki pflwiki pihwiki piwiki plwiki pmswiki pnbwiki pntwiki pswiki ptwiki quwiki rmwiki rmywiki rnwiki roa_rupwiki roa_tarawiki rowiki ruewiki ruwiki rwwiki sahwiki satwiki sawiki scnwiki scowiki scwiki sdwiki sewiki sgwiki shnwiki shwiki simplewiki siwiki skwiki slwiki smwiki snwiki sourceswiki sowiki specieswiki sqwiki srnwiki srwiki sswiki stqwiki stwiki suwiki svwiki swwiki szlwiki szywiki tawiki tcywiki tenwiki test2wiki testwiki tetwiki tewiki tgwiki thwiki tiwiki tkwiki tlwiki tnwiki towiki tpiwiki trwiki tswiki ttwiki tumwiki twwiki tyvwiki tywiki udmwiki ugwiki ukwiki urwiki uzwiki vecwiki vepwiki vewiki viwiki vlswiki votewiki vowiki warwiki wawiki wowiki wuuwiki xalwiki xhwiki xmfwiki yiwiki yowiki zawiki zeawiki zh_classicalwiki zh_min_nanwiki zh_yuewiki zhwiki zuwiki"
# wikis to export for PoC
poc_wikis="enwiki arwiki kowiki cswiki viwiki frwiki fawiki ptwiki ruwiki trwiki plwiki hewiki svwiki ukwiki huwiki hywiki srwiki euwiki arzwiki cebwiki dewiki bnwiki"
# YYYY-MM
monthly_snapshot=$(echo ${snapshot} | awk -F'-' '{print $1"-"$2}')
username=$(whoami)
# Path were raw dataset (Jupyter algo output) will be stored
algo_outputdir=runs/${run_id}/Output
# Path on the local filesystem where production datasets will be stored.
outputdir=runs/${run_id}/imagerec_prod_${snapshot}
make venv
source venv/bin/activate
mkdir -p $(pwd)/runs/${run_id}/
metrics_dir=$(pwd)/runs/${run_id}/metrics
mkdir -p $metrics_dir
echo "Starting training run ${run_id} for snapshot=$snapshot. Model artifacts will be collected under
$(pwd)/runs/${run_id}"
# TODO(gmodena, 2021-02-02):
# Passing one wiki at a time to get a feeling for runtime deltas (to some degree, we could get this info from parsing hdfs snapshots).
# We could pass the whole list at algorunner.py at once,
# and have the pipeline run on a single (long running) spark job. Instead, here we
# are spinning up one spark cluster per wiki. This needs checking with AE, in order
# to better understand which workload better fits our Hadoop cluster.
for wiki in ${wikis}; do
# 1. Run the algo and generate data locally
echo "Generating recommendations for ${wiki}"
STARTTIME=${SECONDS}
python algorunner.py ${snapshot} ${wiki} ${algo_outputdir}
ENDTIME=${SECONDS}
metric_name=metrics.algorunner.${wiki}.${snapshot}.seconds
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
# 2. Upload to HDFS
echo "Publishing raw data to HDFS for ${wiki}"
STARTTIME=${SECONDS}
hadoop fs -rm -r imagerec/data/wiki_db=${wiki}/snapshot=${monthly_snapshot}/
hadoop fs -mkdir -p imagerec/data/wiki_db=${wiki}/snapshot=${monthly_snapshot}/
hadoop fs -copyFromLocal ${algo_outputdir}/${wiki}_${snapshot}_wd_image_candidates.tsv imagerec/data/wiki_db=${wiki}/snapshot=${monthly_snapshot}/
ENDTIME=${SECONDS}
metric_name=metrics.hdfs.copyrawdata.${wiki}.${snapshot}.seconds
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
# 3. Update hive external table metadata
echo "Updating Hive medatada for ${wiki}"
STARTTIME=${SECONDS}
hive -hiveconf username=${username} -f ddl/external_imagerec.hql
done
ENDTIME=${SECONDS}
timestamp=$(date +%s)
metric_name=metrics.hive.imagerec.${wiki}.${snapshot}.seconds
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
# 4. Submit the Spark production data ETL
echo "Generating production data"
## Generate spark config
spark_config=runs/$run_id/regular.spark.properties
cat conf/spark.properties.template /usr/lib/spark2/conf/spark-defaults.conf > ${spark_config}
STARTTIME=${SECONDS}
spark2-submit --properties-file ${spark_config} etl/transform.py \
--snapshot ${monthly_snapshot} \
--source imagerec/data/ \
--destination imagerec_prod/data/ \
--dataset-id ${run_id}
ENDTIME=${SECONDS}
metric_name=metrics.etl.transfrom.${snapshot}.second
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
# 5. Update hive external table metadata (production)
STARTTIME=${SECONDS}
hive -hiveconf username=${username} -f ddl/external_imagerec_prod.hql
ENDTIME=${SECONDS}
metric_name=hive.imagerec_prod.${snapshot}
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
# 6. Export production datasets
STARTIME=${SECONDS}
mkdir ${outputdir}
for wiki in ${poc_wikis}; do
hive -hiveconf username=${username} -hiveconf wiki=${wiki} -hiveconf snapshot=${monthly_snapshot} -f ddl/export_prod_data.hql > ${outputdir}/prod-${wiki}-${snapshot}-wd_image_candidates.tsv
done
ENDTIME=${SECONDS}
echo "Datasets are available at $outputdir/"
metric_name=metrics.etl.export_prod_data.${snapshot}.seconds
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
echo "Export summary"
cut -f 3,4 ${outputdir}/*.tsv | sort -k 1,2 | uniq -c
from etl.transform import ImageRecommendation
from pyspark.sql import functions as F
def test_etl(raw_data):
assert raw_data.count() == 2
assert raw_data.count() == 3
ddf = ImageRecommendation(raw_data).transform()
assert (
......@@ -21,5 +23,27 @@ def test_etl(raw_data):
== 0
)
expected_num_records = 2
expected_num_records = 5
assert ddf.count() == expected_num_records
expected_confidence = {"wikipedia": "medium", "commons": "low", "wikidata": "high"}
for source in expected_confidence:
ddf.where(F.col("source") == source).select(
"confidence_rating"
).distinct().collect()
rows = (
ddf.where(F.col("source") == source)
.select("confidence_rating")
.distinct()
.collect()
)
assert len(rows) == 1
assert rows[0]["confidence_rating"] == expected_confidence[source]
# Unillustrated articles with no recommendation have no confidence rating
assert (
ddf.where(F.col("source") == "null")
.where(F.col("confidence_rating") != "null")
.count()
== 0
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment