Commit adf6d2ae authored by Gmodena's avatar Gmodena
Browse files

Merge branch 'refactor-packaging' into 'main'

Packaging ImageMatching as a Python wheel

See merge request gmodena/ImageMatching!32
parents 7cb80f12 b3ace785
Pipeline #1062 passed with stages
in 54 seconds
name: build
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
strategy:
max-parallel: 4
matrix:
python-version: [3.7, ]
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
make venv
- name: Lint python files with flake8
run: |
make flake8
- uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.8
- name: Install Apache Spark
run: |
# This command will install vanilla spark under ./spark-2.4.8-bin-hadoop2.7
make install_spark
- name: Test with pytest
run: |
export SPARK_HOME=$(pwd)/spark-2.4.8-bin-hadoop2.7
export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:${PYTHONPATH}
export PATH=${PATH}:${SPARK_HOME}/bin:${SPARK_HOME}/sbin
make test
build/
dist/
venv/
algorunner.egg-info
.ipynb_checkpoints/
image: "python:3.7"
stages:
- build
- test
- publish
build-job:
stage: build
script:
- |
# convert notebook to pyspark script
make py
artifacts:
paths:
- build/algorithm.py
expire_in: 1 week
# TODO(gmodena): investigate why CI_JOB_TOKEN is not expanded correctly when set in .pypirc
publish-package:
stage: publish
when: manual
script:
- make venv
- . venv/bin/activate; python setup.py sdist bdist_wheel
- TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token ./venv/bin/python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* --verbose
...@@ -2,22 +2,20 @@ spark_version := 2.4.8 ...@@ -2,22 +2,20 @@ spark_version := 2.4.8
hadoop_version := 2.7 hadoop_version := 2.7
spark_home := spark-${spark_version}-bin-hadoop${hadoop_version} spark_home := spark-${spark_version}-bin-hadoop${hadoop_version}
spark_tgz_url := https://downloads.apache.org/spark/spark-${spark_version}/${spark_home}.tgz spark_tgz_url := https://downloads.apache.org/spark/spark-${spark_version}/${spark_home}.tgz
ima_notebook := ima/notebooks/algorithm.ipynb
pypirc := .pypirc
pypi_repo := gitlab
venv: requirements.txt venv: requirements.txt
test -d venv || python3 -m venv venv test -d venv || python3 -m venv venv
. venv/bin/activate; pip3 install -Ur requirements.txt; . venv/bin/activate; pip3 install --upgrade pip; pip3 install -Ur requirements.txt;
py: venv
rm -rf build
# nbconvert output is saved as <basename>.py
. venv/bin/activate; jupyter nbconvert ${ima_notebook} --to script --output-dir='./build'
install_spark: wheel: venv
test -d ${spark_home} || (wget ${spark_tgz_url}; tar -xzvf ${spark_home}.tgz) . venv/bin/activate; python3 setup.py bdist_wheel
clean_spark: publish: wheel
rm -r ${spark_home}; rm -rf ${spark_home}.tgz . venv/bin/activate; python3 -m twine upload --repository ${pypi_repo} dist/* --config-file ${pypirc} --verbose
flake8: venv
# stop the build if there are Python syntax errors or undefined names in *.py file
. venv/bin/activate; flake8 *.py dataset_metrics/ etl/ tests/ --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
. venv/bin/activate; flake8 *.py dataset_metrics/ etl/ tests/ --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
test: venv
. venv/bin/activate; PYTHONPATH=${PYTHONPATH}:etl/ pytest --cov etl tests/
![](https://github.com/mirrys/ImageMatching/workflows/build/badge.svg?branch=main)
# ImageMatching # ImageMatching
Image recommendation for unillustrated Wikipedia articles Image recommendation for unillustrated Wikipedia articles
...@@ -11,6 +9,12 @@ ssh stat1005.eqiad.wmnet ...@@ -11,6 +9,12 @@ ssh stat1005.eqiad.wmnet
``` ```
### Installation ### Installation
``` shell
pip install algorunner --extra-index-url https://gitlab.wikimedia.org/api/v4/projects/40/packages/pypi/simple
```
### Build from source
First, clone the repository First, clone the repository
```shell ```shell
git clone https://github.com/clarakosi/ImageMatching.git git clone https://github.com/clarakosi/ImageMatching.git
...@@ -27,15 +31,18 @@ Install the dependencies ...@@ -27,15 +31,18 @@ Install the dependencies
```shell ```shell
export=http_proxy=http://webproxy.eqiad.wmnet:8080 export=http_proxy=http://webproxy.eqiad.wmnet:8080
export=https_proxy=http://webproxy.eqiad.wmnet:8080 export=https_proxy=http://webproxy.eqiad.wmnet:8080
python3 setup.py install make wheel
pip install dist/algorunner-0.2.0-py3-none-any.whl --no-cache-dir
``` ```
`scripts` are installed at `./venv/bin/` and automatically added to PATH.
### Running the script ### Running the script
To run the script pass in the **snapshot** (required), **language** (defaults to all wikis), To run the script pass in the **snapshot** (required), **language** (defaults to all wikis),
and **output directory** (defaults to Output) and **output directory** (defaults to Output)
```shell ```shell
python3 algorunner.py 2020-12-28 hywiki Output algorunner.py 2020-12-28 hywiki Output
``` ```
The output .ipynb and .tsv files can be found in your output directory The output .ipynb and .tsv files can be found in your output directory
...@@ -43,80 +50,3 @@ The output .ipynb and .tsv files can be found in your output directory ...@@ -43,80 +50,3 @@ The output .ipynb and .tsv files can be found in your output directory
ls Output ls Output
hywiki_2020-12-28.ipynb hywiki_2020-12-28_wd_image_candidates.tsv hywiki_2020-12-28.ipynb hywiki_2020-12-28_wd_image_candidates.tsv
``` ```
## Production data ETL
`etl` contains [pyspark](https://spark.apache.org/docs/latest/api/python/index.html) utilities to transform the
algo raw output into a _production dataset_ that will be consumed by a service.
### TSV to parquet
`raw2parquet.py` is a job that loads a tsv file (model output), converts it to
parquet, and stores it to HDFS (or local) using the `wiki_db=wiki/snapshot=YYYY-MM`
partitioning scheme.
```bash
spark2-submit --properties-file conf/spark.properties --files etl/schema.py etl/raw2parquet.py \
--wiki <wiki name> \
--snapshot <YYYY-MM> \
--source <raw data> \
--destination <production data>
```
### Production dataset
`transform.py` parses raw model output and tansforms it to production data,
and stores it to HDFS (or local) using the `wiki=wiki/snapshot=YYYY-MM` partitioning scheme.
```bash
spark2-submit --files etl/schema.py etl/transform.py \
--snapshot <YYYY-MM> \
--source <raw data> \
--destination <production data>
```
`conf/spark.properties` provides default settings to run the ETL as a [regular size spark job](https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Spark_Resource_Settings) on WMF's Analytics cluster.
```bash
spark2-submit --properties-file conf/spark.properties --files etl/schema.py etl/transform.py \
--wiki <wiki name> \
--snapshot <YYYY-MM> \
--source <raw data> \
--destination <production data>
```
## Metrics collection
On WMF's cluster the Hadoop Resource Manager (and Spark History) is available at `https://yarn.wikimedia.org/cluster`.
Additional instrumentation can be enabled by passing `metrics.properites` file to the Notebook or ETL jobs. A template
metrics files, that outpus to the driver and executors stdout, can be found at `conf/metrics.properties.template`.
The easiest way to do it by setting `PYSPARK_SUBMISSION_ARGS`. For example
```bash
export PYSPARK_SUBMIT_ARGS="--files ./conf/metrics.properties --conf spark.metrics.conf=metrics.properties pyspark-shell"
python3 algorunner.py 2020-12-28 hywiki Output
```
Will submit the `algorunner` job, with additional instrumentation.
For more information refer to https://spark.apache.org/docs/latest/monitoring.html.
### Get dataset Metrics
To get the dataset metrics run the dataset_metrics_python script. The script expects the **snapshot** (required)
and **output directory** (defaults to Output)
```shell
cd dataset_metrics/
python3 dataset_metrics_runner.py 2021-01 Output
```
### Exporting datasets
The following scripts export the datasets currently used by client teams.
* `ddl/export_prod_data.hql` generates the canonical dataset for the `image-suggestions-api` service.
* `ddl/export_prod_data-android.hql` generates an Android specific variant.
A template is provided at `ddl/imagerec.sqlite.template` to ingest data into sqlite
for testing and validation purposes. It's parametrized by a `SNAPSHOT` variable;
an sqlite script (DDL and `.import`s) can be generated in Bash with:
```{bash}
export SNAPSHOT=2021-02-22
eval "cat <<EOF
$(cat imagerec.sqlite.template)
EOF
" 2> /dev/null
```
# Enable JvmSource for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for the ConsoleSink
#*.sink.console.period=10
# Unit of the polling period for the ConsoleSink
#*.sink.console.unit=seconds
# Polling period for the ConsoleSink specific for the master instance
#master.sink.console.period=15
# Unit of the polling period for the ConsoleSink specific for the master
# instance
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
#*.sink.csv.unit=minutes
# Polling directory for CsvSink
#*.sink.csv.directory=tmp/
# Worker instance overlap polling period
#worker.sink.csv.period=1
#worker.sink.csv.unit=minutes
spark.master yarn
spark.submit.deployMode client
# Cluster topology for regular sized jobs (15% resource utilisation)
# https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Spark_Resource_Settings
spark.driver.memory 2g
spark.dynamicAllocation.maxExecutors 64
spark.executor.memory 8g
spark.executor.cores 4
spark.sql.shuffle.partitions 256
# Job specific config
spark.sql.sources.partitionOverwriteMode dynamic
# Append spark-defaults.conf from:
# /usr/lib/spark2/conf/spark-defaults.conf
# List of wiki languages available
all_wikis="aawiki abwiki acewiki adywiki afwiki akwiki alswiki amwiki angwiki anwiki arcwiki arwiki arywiki arzwiki astwiki aswiki atjwiki avkwiki avwiki awawiki aywiki azbwiki azwiki banwiki barwiki bat_smgwiki bawiki bclwiki bewiki bgwiki bhwiki biwiki bjnwiki bmwiki bnwiki bowiki bpywiki brwiki bswiki bugwiki bxrwiki cawiki cdowiki cebwiki cewiki chowiki chrwiki chwiki chywiki ckbwiki cowiki crhwiki crwiki csbwiki cswiki cuwiki cvwiki cywiki dawiki dewiki dinwiki diqwiki donatewiki dsbwiki dtywiki dvwiki dzwiki eewiki elwiki emlwiki enwiki eowiki eswiki etwiki euwiki extwiki fawiki ffwiki fiu_vrowiki fiwiki fjwiki fowiki frpwiki frrwiki frwiki furwiki fywiki gagwiki ganwiki gawiki gcrwiki gdwiki glkwiki glwiki gnwiki gomwiki gorwiki gotwiki guwiki gvwiki hakwiki hawiki hawwiki hewiki hifwiki hiwiki howiki hrwiki hsbwiki htwiki huwiki hywiki hywwiki hzwiki iawiki idwiki iewiki igwiki iiwiki ikwiki ilowiki incubatorwiki inhwiki iowiki iswiki itwiki iuwiki jamwiki jawiki jbowiki jvwiki kaawiki kabwiki kawiki kbdwiki kbpwiki kgwiki kiwiki kjwiki kkwiki klwiki kmwiki knwiki koiwiki kowiki krcwiki krwiki kshwiki kswiki kuwiki kvwiki kwwiki kywiki ladwiki lawiki lbewiki lbwiki lezwiki lfnwiki lgwiki lijwiki liwiki lldwiki lmowiki lnwiki lowiki lrcwiki ltgwiki ltwiki lvwiki maiwiki map_bmswiki mdfwiki mediawikiwiki metawiki mgwiki mhrwiki mhwiki minwiki miwiki mkwiki mlwiki mnwiki mnwwiki mrjwiki mrwiki mswiki mtwiki muswiki mwlwiki myvwiki mywiki mznwiki nahwiki napwiki nawiki nds_nlwiki ndswiki newiki newwiki ngwiki nlwiki nnwiki novwiki nowiki nqowiki nrmwiki nsowiki nvwiki nywiki ocwiki olowiki omwiki orwiki oswiki pagwiki pamwiki papwiki pawiki pcdwiki pdcwiki pflwiki pihwiki piwiki plwiki pmswiki pnbwiki pntwiki pswiki ptwiki quwiki rmwiki rmywiki rnwiki roa_rupwiki roa_tarawiki rowiki ruewiki ruwiki rwwiki sahwiki satwiki sawiki scnwiki scowiki scwiki sdwiki sewiki sgwiki shnwiki shwiki simplewiki siwiki skwiki slwiki smwiki snwiki sourceswiki sowiki specieswiki sqwiki srnwiki srwiki sswiki stqwiki stwiki suwiki svwiki swwiki szlwiki szywiki tawiki tcywiki tenwiki test2wiki testwiki tetwiki tewiki tgwiki thwiki tiwiki tkwiki tlwiki tnwiki towiki tpiwiki trwiki tswiki ttwiki tumwiki twwiki tyvwiki tywiki udmwiki ugwiki ukwiki urwiki uzwiki vecwiki vepwiki vewiki viwiki vlswiki votewiki vowiki warwiki wawiki wowiki wuuwiki xalwiki xhwiki xmfwiki yiwiki yowiki zawiki zeawiki zh_classicalwiki zh_min_nanwiki zh_yuewiki zhwiki zuwiki"
# List of target wikis provided to client teams.
target_wikis="enwiki arwiki kowiki cswiki viwiki frwiki fawiki ptwiki ruwiki trwiki plwiki hewiki svwiki ukwiki huwiki hywiki srwiki euwiki arzwiki cebwiki dewiki bnwiki eswiki itwiki"
import pytest
from etl.transform import RawDataset
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def raw_data(spark_session):
return spark_session.createDataFrame(
[
(
"0",
"Q1234",
"44444",
"Some page with suggestions",
'[{"image": "image1.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}]',
None,
"arwiki",
"2020-12",
),
(
"1",
"Q56789",
"55555",
"Some page with no suggestion",
None,
None,
"arwiki",
"2020-12",
),
(
"2",
"Q66666",
"523523",
"Some page with 3 suggestions",
"["
'{"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki,arwiki,enwiki"}, '
'{"image": "image3.jpg", "rating": 1, "note": "image was in the Wikidata item"}, '
'{"image": "image4.jpg", "rating": 3.0, "note": "image was found in the Commons category linked in '
'the Wikidata item"} '
"]",
'{"entity-type":"item","numeric-id":577,"id":"Q577"}',
"enwiki",
"2020-12",
),
],
RawDataset.schema,
)
@pytest.fixture(scope="session")
def wikis(spark_session: SparkSession) -> DataFrame:
return spark_session.createDataFrame(
[
["image was found in the following Wikis: ruwiki, itwiki,enwiki"],
["image was found in the following Wikis: "],
[None],
],
["note"],
)
def assert_shallow_equals(ddf: DataFrame, other_ddf: DataFrame) -> None:
assert len(set(ddf.columns).difference(set(other_ddf.columns))) == 0
assert ddf.subtract(other_ddf).rdd.isEmpty()
assert other_ddf.subtract(ddf).rdd.isEmpty()
-- This script is used to export production datasets,
-- in a format consumable by the APIs.
--
-- Run with:
-- hive -hiveconf output_path=<output_path> -hiveconf username=${username} -hiveconf wiki=${wiki} -hiveconf snapshot=${monthly_snapshot} -f export_prod_data.hql
--
--
-- Format
-- * Include header: yes
-- * Field delimiter: "\t"
-- * Null value for missing recommendations
-- (image_id, confidence_rating, source fields): ""
-- * found_on: list of wikis delimited by ','
--
-- Changelog:
-- * 2021-03-31: creation.
--
--
use ${hiveconf:username};
set hivevar:null_value="";
set hivevar:found_on_delimiter=",";
set hive.cli.print.header=true;
insert overwrite local directory '${hiveconf:output_path}'
row format delimited fields terminated by '\t'
select page_id,
page_title,
nvl(image_id, ${null_value}) as image_id,
nvl(confidence_rating, ${null_value}) as confidence_rating,
nvl(source, ${null_value}) as source,
dataset_id,
insertion_ts,
wiki,
concat_ws(${found_on_delimiter}, found_on) as found_on
from imagerec_prod
where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}' and is_article_page=true and image_id is not null;
-- This script is used to export production datasets,
-- in a format consumable by the APIs.
--
-- Data is collected locally, in TSV format, under <output_path>.
--
-- Run with:
-- hive -hiveconf output_path=<output_path> -hiveconf username=${username} -hiveconf wiki=${wiki} -hiveconf snapshot=${monthly_snapshot} -f export_prod_data.hql
--
--
-- Format
-- * Include header: yes
-- * Field delimiter: "\t"
-- * Null value for missing recommendations
-- (image_id, confidence_rating, source fields): ""
-- * found_on: list of wikis delimited by ','
--
-- Changelog:
-- * 2021-03-08: schema and format freeze.
-- * 2021-03-25: append found_on column
-- * 2021-03-25: add is_article_page to where clause
--
use ${hiveconf:username};
set hivevar:null_value="";
set hivevar:found_on_delimiter=",";
set hive.cli.print.header=true;
insert overwrite local directory '${hiveconf:output_path}'
row format delimited fields terminated by '\t'
select page_id,
page_title,
nvl(image_id, ${null_value}) as image_id,
nvl(confidence_rating, ${null_value}) as confidence_rating,
nvl(source, ${null_value}) as source,
dataset_id,
insertion_ts,
wiki,
concat_ws(${found_on_delimiter}, found_on) as found_on
from imagerec_prod
where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}' and is_article_page=true
-- DDL to create an external table that exposes samples of the
-- production dataset.
-- The default HDFS location and Hive database are relative to a developer's.
-- username. Example hdfs://analytics-hadoop/user/clarakosi/imagerec/data.
--
-- The dataset will be available at https://superset.wikimedia.org/superset/sqllab via the
-- `presto_analytics` database.
--
-- Execution
-- hive -hiveconf username=<username> -f external_imagerec.hql
USE ${hiveconf:username};
CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec` (
`pandas_idx` string,
`item_id` string,
`page_id` string,
`page_title` string,
`top_candidates` string,
`instance_of` string)
PARTITIONED BY (
`wiki_db` string,
`snapshot` string)
STORED AS PARQUET
LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec';
-- Update partition metadata
MSCK REPAIR TABLE `imagerec`;
-- DDL to create an external table that exposes samples of the
-- production dataset.
-- The default HDFS location and Hive database are relative to a developer's.
-- username. Example hdfs://analytics-hadoop/user/gmodena/imagerec_prod/data.
--
-- The dataset will be available at https://superset.wikimedia.org/superset/sqllab via the
-- `presto_analytics` database.
--
-- Execution
-- hive -hiveconf username=<username> -f external_imagerec_prod.hql
USE ${hiveconf:username};
CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`(
`page_id` string,
`page_title` string,
`image_id` string,
`confidence_rating` string,
`source` string,
`instance_of` string,
`is_article_page` boolean,
`dataset_id` string,
`insertion_ts` double,
`found_on` array<string>)
PARTITIONED BY (`wiki` string, `snapshot` string)
STORED AS PARQUET
LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec_prod';
-- Update partition metadata
MSCK REPAIR TABLE `imagerec_prod`;
-- DDL to create an external table that exposes the
-- production dataset for the search team.
-- The default HDFS location and Hive database are relative to a developer's.
-- username. Example hdfs://analytics-hadoop/user/clarakosi/search_imagerec/data.
--
-- The dataset will be available at https://superset.wikimedia.org/superset/sqllab via the
-- `presto_analytics` database.
--
-- Execution
-- hive -hiveconf username=<username> -f external_search_imagerec.hql
USE ${hiveconf:username};
CREATE EXTERNAL TABLE IF NOT EXISTS `search_imagerec`(
`wikiid` string,
`page_id` int,
`page_namespace` int,
`recommendation_type` string)
PARTITIONED BY (`year` int, `month` int, `day` int)
STORED AS PARQUET
LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/search_imagerec';
-- Update partition metadata
MSCK REPAIR TABLE `search_imagerec`;
\ No newline at end of file
CREATE TABLE t(page_id INTEGER,
page_title TEXT,
image_id TEXT,
confidence_rating TEXT,
source TEXT,
dataset_id TEXT,
insertion_ts REAL,
wiki TEXT,
found_on TEXT);
CREATE INDEX t_wiki_page_id ON t(wiki, page_id);
.mode ascii
.separator "\t" "\n"
.timer on
.import imagerec_prod_${SNAPSHOT}/prod-arwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-arzwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-bnwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-cebwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-cswiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-dewiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-enwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-eswiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-euwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-fawiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-frwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-hewiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-huwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-hywiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-itwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-kowiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-plwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-ptwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-ruwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-srwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-svwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-trwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-ukwiki-${SNAPSHOT}-wd_image_candidates.tsv t
.import imagerec_prod_${SNAPSHOT}/prod-viwiki-${SNAPSHOT}-wd_image_candidates.tsv t
from enum import Enum
class InstancesToFilter(Enum):
YEAR = "Q577"
CALENDARYEAR = "Q3186692"
RECURRENTTIMEFRAME = "Q14795564"
CENTURYLEAPYEAR = "Q3311614"
FAMILYNAME = "Q101352"
NAME = "Q82799"
DISAMBIGUATION = "Q4167410"
LIST = "Q13406463"
@classmethod
def list(cls):
return [p.value for p in InstancesToFilter]
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from schema import CsvDataset
import argparse
spark = SparkSession.builder.getOrCreate()
def parse_args():
parser = argparse.ArgumentParser(
description="Transform raw algo output to production datasets"
)
parser.add_argument("--snapshot", help="Montlhy snapshot date (YYYY-MM)")
parser.add_argument("--wiki", help="Wiki name")
parser.add_argument("--source", help="Source dataset path")
parser.add_argument("--destination", help="Destination path")
return parser.parse_args()
if __name__ == "__main__": <