Commit 610eeadc authored by Fabian Kaelin's avatar Fabian Kaelin
Browse files

Merge branch 'ci-investigations' into '5-add-ci-integration-package-conda-env-wip'

Update Gitlab CI MR

See merge request !10
parents 2ed89257 31aa8665
Pipeline #3706 passed with stages
in 2 minutes and 28 seconds
......@@ -4,7 +4,7 @@ current_version = 0.1.0
# Create a commit using git when true.
commit = False
# Whether to create a git tag, that is the new version,
# prefixed with the character "r".
# prefixed with the character "r".
# Don't forget to git-push with the --tags flag.
tag = False
......
data/
*.egg-info
*.pyc
__pycache__/
\ No newline at end of file
__pycache__/
.cache/
.conda/
.coverage
.local/
build/
dist/
......@@ -17,12 +17,12 @@
# publish_conda_env package the project and its dependencis in a conda distribution.
# This is a relocatable virtual environment, that can be used to
# ship the project to spark workers.
# The knowledge-gaps archive is available at
# https://gitlab.wikimedia.org/repos/research/knowledge-gaps/-/packages/86
# The knowledge-gaps archive is available at
# https://gitlab.wikimedia.org/repos/research/knowledge-gaps/-/packages/86
# Currently this job must be manually trigger from Gitlab's Pipeline ui.
include:
- project: 'repos/data-engineering/workflow_utils'
ref: 'v0.4.0'
ref: 'v0.5.0'
file: '/gitlab_ci_templates/pipelines/conda_artifact_repo.yml'
# A set of commands that will be executed before each job.
......@@ -62,7 +62,7 @@ typecheck:
publish_conda_env:
before_script:
- !reference [.conda_setup_script]
- !reference [.conda_setup_script]
- apt install -y libkrb5-dev libsasl2-dev gcc g++
script:
- !reference [.conda_dist_publish_script]
# A Docker file to build a buster image with python 3 and miniconda.
# It's based atop Wikimedia's official python3 image, and its runtime
# matches the container that runs Gitlab CI.
# based on https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils/-/blob/main/gitlab_ci_templates/lib/conda.yml
FROM docker-registry.wikimedia.org/python3-build-buster:0.1.0-20220313
FROM docker-registry.wikimedia.org/wikimedia-buster:20210523
RUN apt update
RUN apt install -y curl gpg
RUN curl https://repo.anaconda.com/pkgs/misc/gpgkeys/anaconda.asc | gpg --dearmor > /tmp/conda.gpg
RUN install -o root -g root -m 644 /tmp/conda.gpg /usr/share/keyrings/conda-archive-keyring.gpg
RUN gpg --keyring /usr/share/keyrings/conda-archive-keyring.gpg --no-default-keyring --fingerprint 34161F5BF5EB1D4BFBBB8F0A8AEB4F8B29D82806
RUN echo "deb [arch=amd64 signed-by=/usr/share/keyrings/conda-archive-keyring.gpg] https://repo.anaconda.com/pkgs/misc/debrepo/conda stable main" > /etc/apt/sources.list.d/conda.list
ARG wmf_workflow_utils=v0.5.0
# Install a number of deps necessary to build wmfdata (and other).
RUN apt update && apt install -y gcc g++ make curl libkrb5-dev conda libsasl2-dev
# Install tox in the docker image (rather than declare it in requirements.txt)
# to match Gitlab's runtime.
RUN pip3 install tox==3.24.4
RUN echo "deb http://apt.wikimedia.org/wikimedia buster-wikimedia thirdparty/conda" >> /etc/apt/sources.list.d/wikimedia.list
RUN apt update
RUN apt install -y curl gcc g++ gpg git make ca-certificates conda
RUN apt install -y libkrb5-dev libsasl2-dev
SHELL ["/bin/bash", "-c"]
RUN source /opt/conda/etc/profile.d/conda.sh && conda activate
# RUN pip install tox==3.24.4
# RUN pip install git+https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils.git@${wmf_workflow_utils}
RUN /opt/conda/bin/pip install tox==3.24.4
RUN /opt/conda/bin/pip install git+https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils.git@${wmf_workflow_utils}
\ No newline at end of file
# This Makefile provides some boilerplate to run conda-dist,
# test, lint and typechecking targets.
# A docker image is available to perform all CI steps
# locally, in an enviroment that matches Gitlab's CI container.
#
# A docker image is available to perform all CI steps
# locally, in an enviroment that matches Gitlab's CI container.
#
# Example:
#
# $ make docker-conda
......@@ -26,10 +26,6 @@ DOCKER_PLATFORM := linux/amd64
PROJECT_NAME := knowledge-gaps
# Do not modify PROJECT_VERSION manually. Use bump2version instead.
PROJECT_VERSION := 0.1.0
# TODO(gmodena, 2022-03-25): when development stabilizes we should track
# a release tag / wheel instead of main. I encountered occasional conda-pack
# failures while tracking main, that I was not able to consistently reproduce.
WMF_WORKFLOW_UTILS := git+https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils.git@main
CONDA_DIST := ./dist/${PROJECT_NAME}-${PROJECT_VERSION}.conda-${GIT_BRANCH}-${GIT_COMMIT_HASH}
REQUIREMENTS_FILE := requirements_dev.txt
......@@ -47,7 +43,7 @@ DOCKER_CMD := docker run -it \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
# The research/miniconda3 image is required to run
# The research/miniconda3 image is required to run
# CI targets in a docker container.
env: docker-conda
test: docker-conda
......@@ -57,10 +53,10 @@ endif
# Create a conda dist archive, containing this project and all
# its dependencies.
env:
${DOCKER_CMD} bash -c "source /opt/conda/etc/profile.d/conda.sh; \
/opt/conda/bin/pip install ${WMF_WORKFLOW_UTILS}; \
/opt/conda/bin/conda-dist --dist-env-prefix=${CONDA_DIST}"
env:
${DOCKER_CMD} bash -c "source /opt/conda/etc/profile.d/conda.sh && \
conda activate && \
conda-dist --dist-env-prefix=${CONDA_DIST}"
# Run the pytest suite.
test:
......
......@@ -4,6 +4,57 @@
1. Install dependencies: `pip install -r requirements_dev.txt`
2. Enable Git hooks: `git config --local core.hooksPath .githooks/`
## Running
### Metrics
Activate your Conda environment, install the current package, and pack
the environment like so:
```bash
pip install .
conda pack --ignore-editable-packages -o environment.tar.gz
```
Then run:
```bash
PYSPARK_DRIVER_PYTHON=python \
PYSPARK_PYTHON=./environment/bin/python \
spark2-submit \
--master yarn \
--driver-memory 2G \
--executor-memory 8G \
--archives environment.tar.gz#environment \
knowledge_gaps/metrics.py \
20220101 20220131 \
--projects enwiki \
--pageviews_table bmansurov.pageview_hourly_2022_01 \
--wikipedia_pages_table wikipedia_pages_2022_01 \
--mediawiki_snapshot 2022-01 \
--wikidata_snapshot 2022-01-24 \
--save_table bmansurov.metrics_20220422
```
* run locally:
```bash
spark2-submit --master local knowledge_gaps/content_gap_pipeline.py \
--content_gap <content gap> \
--output <table name to save> \
--wikidata_qid_table <saved wikidata qid table> \
--wikidata_prop_table <saved wikidata property table> \
--wikipedia_page_table <saved wikipedia page table>
```
* run on yarn:
```bash
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./venv/bin/python
spark2-submit --master yarn --archives <conda archive file>#venv knowledge_gaps/content_gap_pipeline.py \
--content_gap <content gap> \
--output <table name to save> \
--projects <wiki list> \
--mediawiki_snapshot <YYYY-MM> \
--wikidata_snapshot <YYYY-MM-DD>
```
## Testing
A [CI Pipeline](https://gitlab.wikimedia.org/repos/research/knowledge-gaps/-/pipelines) is triggered
......@@ -12,14 +63,14 @@ after each `push` and merge request.
Under the hood we use [tox](https://pypi.org/project/tox/) for test automation.
`Makefile` provides some boilerplate to run conda-dist, test, lint and typechecking targets.
A docker image is available to perform all CI steps locally,
in an enviroment that matches Gitlab's CI container.
A docker image is available to perform all CI steps locally,
in an enviroment that matches Gitlab's CI container.
Run
Run
```
make docker-conda
make docker-conda
```
to generate a suitable python3 + miniconda docker image.
to generate a suitable python3 + miniconda docker image.
- generate a `x86_64-linux` conda distrirubtion runt time with
......@@ -62,7 +113,7 @@ To help propagate version bumps across multiple file locations (`setup.py`, `Mak
For example, the following command
```
bump2version patch
bump2version patch
```
will increase the patch version in `setup.py` and `Makefile`.
Changes will need to be manually committed to git.
# This file is required to create a conda distribution
# with conda-dist.
channels:
- defaults
- conda-forge
# channels:
# - defaults
# - conda-forge
dependencies:
- python=3.7
- python-gssapi
# - python-gssapi
......@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
import pyspark.sql.functions as F
import wmfdata as wmf
spark_session = wmf.spark.get_session(
spark = wmf.spark.get_session(
type='local',
app_name="knowledge-gaps"
)
......@@ -17,7 +17,7 @@ today = datetime.today()
ninety_days_earlier = today - timedelta(days=90)
pageviews_df = article_features.extract_pageviews(
spark_session,
spark,
ninety_days_earlier,
today,
projects,
......@@ -26,12 +26,12 @@ pageviews_df = article_features.extract_pageviews(
print(pageviews_df.head(3))
def get_pages_df(spark_session, table='aikochou.pages_20220310'):
def get_pages_df(spark, table='aikochou.pages_20220310'):
query = f"SELECT * FROM {table}"
return spark_session.sql(query)
return spark.sql(query)
pages_df = get_pages_df(spark_session)
pages_df = get_pages_df(spark)
print(pages_df.head(3))
# Here we can add content gaps to pages_df, etc.
......
import argparse
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, ArrayType, FloatType
import pandas as pd # type: ignore
import numpy as np # type: ignore
import re
def cml_sum(x):
return np.cumsum(np.array(x))
cml_sum_udf = F.udf(lambda x: cml_sum(x).tolist(), ArrayType(IntegerType()))
def percentage(x, y):
return [x[i]/y[i]*100 if y[i]!=0 else None for i in range(len(x))]
percentage_udf = F.udf(lambda x, y: percentage(x, y), ArrayType(FloatType()))
def rename_cols(df, prefix, other_cols):
new_col_name = [col(col_name).alias(prefix + re.search(r"(\d+)", col_name).group(0))
for col_name in df.columns if col_name.startswith("sum")]
return df.select(*other_cols, *new_col_name)
def add_percentages(df, metric_name):
df = df.withColumn(metric_name+'_pct', percentage_udf(col(metric_name), col(metric_name+'_sum')))
df = df.withColumn(metric_name+'_cml_pct', percentage_udf(col(metric_name+'_cml'), col(metric_name+'_sum_cml')))
return df
def get_df(spark, table):
"Temporary function for easy pages retrieval during development."
query = f"SELECT * FROM {table}"
return spark.sql(query)
def main(args):
spark = SparkSession.builder.getOrCreate()
tbl = get_df(spark, args.content_gap_table)
if args.period == "monthly":
tbl = tbl.withColumn("month", F.substring("create_timestamp", 1, 7))
t_range = [m.strftime('%Y-%m') for m in pd.date_range(start=args.start_time, end=args.end_time, freq='M')]
expr = [F.when(F.col("month") == t, 1).otherwise(0).alias(args.metric+'_'+t) for t in t_range]
elif args.period == "yearly":
tbl = tbl.withColumn("year", F.substring("create_timestamp", 1, 4))
t_range = [str(y) for y in range(int(args.start_time), int(args.end_time)+1)]
expr = [F.when(F.col("year") == t, 1).otherwise(0).alias(args.metric+'_'+t) for t in t_range]
tbl = tbl.select("wiki_db",
"page_id",
"qitem_id",
"page_title",
"is_human",
args.content_gap,
*expr).cache()
t_col_names = [args.metric+'_'+t for t in t_range]
aggr_exprs = {col_name: "sum" for col_name in t_col_names}
sum_col_names = [args.metric+'_sum_'+t for t in t_range]
if args.content_gap in ("gender", "sexual_orientation"):
## metric ##
exploded = (tbl
.where(col("is_human")==True)
.withColumn("gap", F.explode(args.content_gap))
.select("wiki_db",
"page_id",
"qitem_id",
"page_title",
*t_col_names,
F.col("gap."+args.content_gap)
.alias(args.content_gap)).cache())
grouped = exploded.groupBy("wiki_db", args.content_gap).agg(aggr_exprs)
grouped = rename_cols(grouped, args.metric+'_', ["wiki_db", args.content_gap])
grouped = grouped.withColumn(args.metric, F.array(*t_col_names)).drop(*t_col_names) # arrayize
grouped = grouped.withColumn(args.metric+'_cml', cml_sum_udf(col(args.metric))) # cumulative
## biography articles ##
bio_total = tbl.where(col("is_human")==True).groupBy("wiki_db").agg(aggr_exprs)
bio_total = rename_cols(bio_total, args.metric+'_sum_', ["wiki_db"])
bio_total = bio_total.withColumn(args.metric+'_sum', F.array(*sum_col_names)).drop(*sum_col_names) # arrayize
bio_total = bio_total.withColumn(args.metric+'_sum_cml', cml_sum_udf(col(args.metric+'_sum'))) # cumulative
joined = grouped.join(bio_total, "wiki_db", "left")
result = add_percentages(joined, args.metric)
#joined.write.mode('overwrite').saveAsTable('aikochou.article_count_20220401')
result.show()
elif args.content_gap == "geographic":
## all articles ##
total = tbl.groupBy("wiki_db").agg(aggr_exprs)
total = rename_cols(total, args.metric+'_sum_', ["wiki_db"])
total = total.withColumn(args.metric+'_sum', F.array(*sum_col_names)).drop(*sum_col_names) # arrayize
total = total.withColumn(args.metric+'_sum_cml', cml_sum_udf(col(args.metric+'_sum'))) # cumulative
## metric ##
model = "geospatial_model"
if model == "cultural_model":
### geographic.cultural_model.region ###
exploded = (tbl
.withColumn('gap', F.explode('geographic.cultural_model'))
.select("wiki_db",
"page_id",
"qitem_id",
"page_title",
*t_col_names,
F.col("gap.region").alias("region")).cache())
grouped = exploded.groupBy("wiki_db", "region").agg(aggr_exprs)
grouped = rename_cols(grouped, args.metric+'_', ["wiki_db", "region"])
grouped = grouped.withColumn(args.metric, F.array(*t_col_names)).drop(*t_col_names) # arrayize
grouped = grouped.withColumn(args.metric+'_cml', cml_sum_udf(col(args.metric))) # cumulative
joined = grouped.join(total, "wiki_db", "left")
result = add_percentages(joined, args.metric)
result.show()
elif model == "geospatial_model":
### geographic.geospatial_model ###
exploded = (tbl
.withColumn('gap', F.explode('geographic.geospatial_model'))
.select("wiki_db",
"page_id",
"qitem_id",
"page_title",
*t_col_names,
F.col('gap.geocode.country_code').alias('country_code'),
F.col('gap.continent').alias('continent'),
F.col('gap.sub_continent').alias('sub_continent')).cache())
grouped = exploded.groupBy("wiki_db", "country_code", "continent", "sub_continent").agg(aggr_exprs)
grouped = rename_cols(grouped, args.metric+'_', ["wiki_db",
"country_code",
"continent",
"sub_continent"]).cache()
categories = ["sub_continent", "continent"]
geospatial = []
for category in categories:
cat_grouped = grouped.groupBy("wiki_db", category).agg(aggr_exprs)
cat_grouped = rename_cols(cat_grouped, args.metric+'_', ["wiki_db", category])
cat_grouped = cat_grouped.withColumn(args.metric, F.array(*t_col_names)).drop(*t_col_names) # arrayize
cat_grouped = cat_grouped.withColumn(args.metric+'_cml', cml_sum_udf(col(args.metric))) # cumulative
geospatial.append(cat_grouped)
grouped = grouped.drop(*categories)
grouped = grouped.withColumn(args.metric, F.array(*t_col_names)).drop(*t_col_names) # arrayize
grouped = grouped.withColumn(args.metric+'_cml', cml_sum_udf(col(args.metric))) # cumulative
geospatial.append(grouped)
geospatial = [x.join(total, "wiki_db", "left") for x in geospatial]
results = [add_percentages(x, args.metric) for x in geospatial]
for r in results:
r.show()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--content_gap',
default='geographic',
help='gender, sexual_orientation, geographic or time')
parser.add_argument('--metric',
default='article',
help='article or pageview')
parser.add_argument('--period',
default='yearly',
help='"monthly" or "yearly"')
parser.add_argument('--start_time',
default='2001',
help='Count articles starting this year'
' or month(%Y-%m)')
parser.add_argument('--end_time',
default='2021',
help='Count articles ending this year'
' or month(%Y-%m)')
parser.add_argument('--content_gap_table',
default='aikochou.content_gap_feature_20220401',
help='Content gap feature table.')
args = parser.parse_args()
print(args)
main(args)
\ No newline at end of file
......@@ -64,10 +64,10 @@ def extract_templates(wikicode):
return None
@F.udf(returnType="struct<links:array<string>,num_refs:int,headings:array<string>,templates:array<string>>")
def extract_wikitext_attributes(wikitext):
"""
The python functions at the top operate on the wikicode allows us to write an udf that parses the wikitext only once.
returnType: "struct<links:array<string>,num_refs:int,headings:array<string>,templates:array<string>>"
"""
features = {}
wikicode = mwparserfromhell.parse(wikitext)
......@@ -78,7 +78,35 @@ def extract_wikitext_attributes(wikitext):
return Row(**features)
def extract_pageviews(spark_session, start_date, end_date,
def wikitext_df(spark, mediawiki_snapshot, projects=None):
"""
retrieve wikitext for all the wikipedia pages for given projects. default is all wiki projects.
root
|-- wiki_db: string (nullable = true)
|-- page_id: long (nullable = true)
|-- page_title: string (nullable = true)
|-- revision_id: long (nullable = true)
|-- revision_text: string (nullable = true)
"""
query = f"""
SELECT mw.wiki_db, mw.page_id, mw.page_title, mw.revision_id, mw.revision_text
FROM wmf.mediawiki_wikitext_current mw
LEFT JOIN wmf_raw.mediawiki_page p
ON p.page_id = mw.page_id
AND p.wiki_db = mw.wiki_db
WHERE mw.snapshot = '{mediawiki_snapshot}'
AND mw.page_namespace = 0
AND p.snapshot = '{mediawiki_snapshot}'
AND p.page_namespace = 0
AND p.page_is_redirect = 0
"""
df = spark.sql(query)
if projects:
df = df.where(F.col('wiki_db').isin(projects))
return df
def extract_pageviews(spark, start_date, end_date,
projects=None, table="wmf.pageview_hourly"):
"""Extract the number of pageviews between START_DATE and END_DATE
for PROJECTS. Supply a smaller TABLE for faster queries during
......@@ -86,7 +114,7 @@ def extract_pageviews(spark_session, start_date, end_date,
Parameters
----------
spark_session : SparkSession
spark : SparkSession
start_date : datetime.datetime
Start date for counting pageviews.
......@@ -131,7 +159,7 @@ def extract_pageviews(spark_session, start_date, end_date,
GROUP BY
project, page_title, page_id, year, month, day
"""
df = spark_session.sql(query)
df = spark.sql(query)
if projects:
df = df.where(F.col('wiki_db').isin(projects))
return df
import argparse
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from knowledge_gaps import util, func
def get_df(spark, table):
"Temporary function for easy pages retrieval during development."
query = f"SELECT * FROM {table}"
return spark.sql(query)
def main(args, f):
spark = SparkSession.builder.getOrCreate()
if args.wikidata_qid_table:
qids = get_df(spark, args.wikidata_qid_table)
else:
qids = func.wikidata_relevant_qitems_df(spark,
args.wikidata_snapshot,
args.mediawiki_snapshot,
args.projects)
if args.wikidata_prop_table:
prop = get_df(spark, args.wikidata_prop_table)
else:
prop = func.wikidata_relevant_properties(spark,
args.wikidata_snapshot,
qids).cache()
if args.wikipedia_page_table:
pages = get_df(spark, args.wikipedia_page_table)
else:
pages = func.wikipedia_pages_df(spark,
args.mediawiki_snapshot,
args.wikidata_snapshot,
args.projects)
qids = func.append_is_human(qids, prop)
if args.content_gap == "gender":
gender_cat = util.get_category(f['gender_cat'][0])
qid_to_gender = spark.createDataFrame(
gender_cat, ['qid', 'gender'])
qids = func.append_gender_feature(qids,
prop,
qid_to_gender)
elif args.content_gap == "sexual_orientation":
sexual_orientation_cat = util.get_category(f['sexual_orientation_cat'][0])
qid_to_label = spark.createDataFrame(sexual_orientation_cat,
['qid', 'sexual_orientation'])
qids = func.append_sexual_orientation_feature(qids,
prop,
qid_to_label)
elif args.content_gap == "geographic":
continent = util.get_continent(f['continent'][0])
cc_to_continent = spark.createDataFrame(continent, ['country_name',
'country_code',
'continent',
'sub_continent'])
region_cat = util.get_qid_to_region(f['region_qids'][0], f['country_aggr'][0])
qid_to_region = spark.createDataFrame(region_cat.items(),
['qid', 'region'])
region_prop = util.get_property(f['country_prop'][0])
region_prop_list = [p[0] for p in region_prop]
qids = func.append_geographic_feature(qids,
prop,
qid_to_region,
cc_to_continent,
region_prop_list)
elif args.content_gap == "time":
time_prop = util.get_property(f['time_prop'][0])
time_prop_list = [p[0] for p in time_prop]
qids = func.append_time_feature(qids,
prop,
time_prop_list)
prop.unpersist()