Commit e37946cc authored by Clarakosi's avatar Clarakosi Committed by GitHub
Browse files

Add a list of instances to filter (#14)

* Add a list of instances to filter

* Update ddl/export_prod_data.hql changelog

* Add metrics and move filer list to enum class

* Update column name to is_article_page
parent 4ac2c22c
%% Cell type:code id:impressed-fourth tags: %% Cell type:code id:impressed-fourth tags:
``` python ``` python
import pyspark.sql import pyspark.sql
import pandas as pd import pandas as pd
import os import os
import getpass import getpass
``` ```
%% Cell type:code id:deluxe-mailman tags:parameters %% Cell type:code id:deluxe-mailman tags:parameters
``` python ``` python
# Create output directory # Create output directory
output_dir = "Data_Metrics_Output" output_dir = "Data_Metrics_Output"
if not os.path.exists(output_dir): if not os.path.exists(output_dir):
os.makedirs(output_dir) os.makedirs(output_dir)
snapshot = "2021-01" snapshot = "2021-01"
username = getpass.getuser() username = getpass.getuser()
``` ```
%% Cell type:markdown id:improving-jonathan tags: %% Cell type:markdown id:improving-jonathan tags:
### Total number of records (per wiki) ### Total number of records (per wiki)
%% Cell type:code id:engaged-inflation tags: %% Cell type:code id:engaged-inflation tags:
``` python ``` python
query = """SELECT wiki AS Wiki, snapshot, COUNT(*) as `Number of Records` query = """SELECT wiki AS Wiki, snapshot, COUNT(*) as `Number of Records`
FROM gmodena.imagerec_prod FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, snapshot GROUP BY wiki, snapshot
ORDER BY wiki""" ORDER BY wiki"""
total_number_of_records = spark.sql(query).toPandas() total_number_of_records = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:lucky-vocabulary tags: %% Cell type:code id:lucky-vocabulary tags:
``` python ``` python
total_number_of_records total_number_of_records
``` ```
%% Cell type:code id:activated-worker tags: %% Cell type:code id:activated-worker tags:
``` python ``` python
total_number_of_records.to_csv(output_dir+"/"+"Total number of records") total_number_of_records.to_csv(output_dir+"/"+"Total number of records")
``` ```
%% Cell type:markdown id:intimate-penny tags: %% Cell type:markdown id:intimate-penny tags:
### Population statistics ### Population statistics
%% Cell type:code id:arabic-casting tags: %% Cell type:code id:arabic-casting tags:
``` python ``` python
population_stat = total_number_of_records['Number of Records'].describe() population_stat = total_number_of_records['Number of Records'].describe()
population_stat.to_csv(output_dir+"/"+"Population statistics") population_stat.to_csv(output_dir+"/"+"Population statistics")
population_stat population_stat
``` ```
%% Cell type:code id:friendly-leonard tags: %% Cell type:code id:friendly-leonard tags:
``` python ``` python
total_number_of_records.boxplot(column=['Number of Records']) total_number_of_records.boxplot(column=['Number of Records'])
``` ```
%% Cell type:code id:loose-throw tags: %% Cell type:code id:loose-throw tags:
``` python ``` python
pop_stat_median = pd.DataFrame(data={"Median": [total_number_of_records["Number of Records"].median()]}) pop_stat_median = pd.DataFrame(data={"Median": [total_number_of_records["Number of Records"].median()]})
pop_stat_median.to_csv(output_dir+"/"+"Population statistics median") pop_stat_median.to_csv(output_dir+"/"+"Population statistics median")
pop_stat_median pop_stat_median
``` ```
%% Cell type:code id:neither-coating tags: %% Cell type:code id:neither-coating tags:
``` python ``` python
pop_stat_mode = total_number_of_records['Number of Records'].mode() pop_stat_mode = total_number_of_records['Number of Records'].mode()
pop_stat_mode.to_csv(output_dir+"/"+"Population statistics mode") pop_stat_mode.to_csv(output_dir+"/"+"Population statistics mode")
pop_stat_mode pop_stat_mode
``` ```
%% Cell type:markdown id:banner-criticism tags: %% Cell type:markdown id:banner-criticism tags:
### Total number of images per page ### Total number of images per page
%% Cell type:code id:lesbian-angel tags: %% Cell type:code id:lesbian-angel tags:
``` python ``` python
query = """SELECT wiki AS Wiki, page_id as `Page ID`, COUNT(*) as `Number of Images` query = """SELECT wiki AS Wiki, page_id as `Page ID`, COUNT(*) as `Number of Images`
FROM gmodena.imagerec_prod FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id GROUP BY wiki, page_id
ORDER BY wiki, page_id""" ORDER BY wiki, page_id"""
total_number_of_images_per_page = spark.sql(query).toPandas() total_number_of_images_per_page = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:polar-click tags: %% Cell type:code id:polar-click tags:
``` python ``` python
total_number_of_images_per_page.to_csv(output_dir+"/"+"Total number of images per page") total_number_of_images_per_page.to_csv(output_dir+"/"+"Total number of images per page")
total_number_of_images_per_page total_number_of_images_per_page
``` ```
%% Cell type:markdown id:front-ratio tags: %% Cell type:markdown id:front-ratio tags:
#### Breakdown of the number of images being suggested for each page #### Breakdown of the number of images being suggested for each page
%% Cell type:markdown id:awful-stuart tags: %% Cell type:markdown id:awful-stuart tags:
Keep in mind that pages without an image suggestion will apear as 1. Keep in mind that pages without an image suggestion will apear as 1.
%% Cell type:code id:neither-emphasis tags: %% Cell type:code id:neither-emphasis tags:
``` python ``` python
query = """SELECT number_of_images AS `Image Suggestions`, count(*) AS `Pages` query = """SELECT number_of_images AS `Image Suggestions`, count(*) AS `Pages`
FROM ( FROM (
SELECT wiki, page_id, COUNT(*) as number_of_images SELECT wiki, page_id, COUNT(*) as number_of_images
FROM gmodena.imagerec_prod FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id GROUP BY wiki, page_id
) AS expr_qry ) AS expr_qry
GROUP BY number_of_images GROUP BY number_of_images
ORDER BY number_of_images""" ORDER BY number_of_images"""
breakdown_of_image_sug_per_page = spark.sql(query).toPandas() breakdown_of_image_sug_per_page = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:assisted-startup tags: %% Cell type:code id:assisted-startup tags:
``` python ``` python
breakdown_of_image_sug_per_page.set_index('Image Suggestions', inplace=True) breakdown_of_image_sug_per_page.set_index('Image Suggestions', inplace=True)
breakdown_of_image_sug_per_page.to_csv(output_dir+"/"+"Breakdown of image sug per page") breakdown_of_image_sug_per_page.to_csv(output_dir+"/"+"Breakdown of image sug per page")
breakdown_of_image_sug_per_page breakdown_of_image_sug_per_page
``` ```
%% Cell type:code id:complicated-delay tags: %% Cell type:code id:complicated-delay tags:
``` python ``` python
breakdown_of_image_sug_per_page.plot(y="Pages", breakdown_of_image_sug_per_page.plot(y="Pages",
title="Breakdown of Images Suggestion Per Page", title="Breakdown of Images Suggestion Per Page",
autopct="%.2f", autopct="%.2f",
figsize=(6, 6), figsize=(6, 6),
kind="pie"); kind="pie");
``` ```
%% Cell type:markdown id:downtown-manner tags: %% Cell type:markdown id:downtown-manner tags:
Breakdown of image suggestion data by confidence rating. Breakdown of image suggestion data by confidence rating.
A rating of None indicates that the page has no image suggestion A rating of None indicates that the page has no image suggestion
%% Cell type:code id:generic-priority tags: %% Cell type:code id:generic-priority tags:
``` python ``` python
query = """SELECT wiki AS Wiki, confidence_rating AS `Confidence Rating`, COUNT(*) AS `Image Suggestions` query = """SELECT wiki AS Wiki, confidence_rating AS `Confidence Rating`, COUNT(*) AS `Image Suggestions`
FROM gmodena.imagerec_prod FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY Wiki, `Confidence Rating` GROUP BY Wiki, `Confidence Rating`
ORDER BY Wiki, `Confidence Rating`""" ORDER BY Wiki, `Confidence Rating`"""
breakdown_of_image_sug_by_confidence_score = spark.sql(query).toPandas() breakdown_of_image_sug_by_confidence_score = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:impressive-failure tags: %% Cell type:code id:impressive-failure tags:
``` python ``` python
breakdown_of_image_sug_by_confidence_score.to_csv(output_dir+"/"+"Breakdown of image sug by conf rating") breakdown_of_image_sug_by_confidence_score.to_csv(output_dir+"/"+"Breakdown of image sug by conf rating")
breakdown_of_image_sug_by_confidence_score breakdown_of_image_sug_by_confidence_score
``` ```
%% Cell type:markdown id:executive-theory tags: %% Cell type:markdown id:executive-theory tags:
#### Get articles with more than 3 image suggestions #### Get articles with more than 3 image suggestions
Assuming no error this table should be empty Assuming no error this table should be empty
%% Cell type:code id:fiscal-poverty tags: %% Cell type:code id:fiscal-poverty tags:
``` python ``` python
query = """WITH large_image_sug AS query = """WITH large_image_sug AS
(SELECT wiki, page_id, COUNT(*) (SELECT wiki, page_id, COUNT(*)
FROM gmodena.imagerec_prod FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id GROUP BY wiki, page_id
HAVING COUNT(*) > 3) HAVING COUNT(*) > 3)
SELECT p.* SELECT p.*
FROM gmodena.imagerec_prod p FROM gmodena.imagerec_prod p
JOIN large_image_sug JOIN large_image_sug
ON large_image_sug.wiki = p.wiki ON large_image_sug.wiki = p.wiki
AND large_image_sug.page_id = p.page_id AND large_image_sug.page_id = p.page_id
AND p.snapshot='"""+snapshot+"""' AND p.snapshot='"""+snapshot+"""'
ORDER BY p.wiki, p.page_id, p.image_id""" ORDER BY p.wiki, p.page_id, p.image_id"""
articles_with_more_image_sug = spark.sql(query).toPandas() articles_with_more_image_sug = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:metallic-visibility tags: %% Cell type:code id:metallic-visibility tags:
``` python ``` python
articles_with_more_image_sug.to_csv(output_dir+"/"+"Articles with more than 3 sug") articles_with_more_image_sug.to_csv(output_dir+"/"+"Articles with more than 3 sug")
articles_with_more_image_sug articles_with_more_image_sug
``` ```
%% Cell type:markdown id:invalid-trader tags: %% Cell type:markdown id:invalid-trader tags:
### Size and counts of intermediate and final datasets ### Size and counts of intermediate and final datasets
%% Cell type:code id:integrated-spell tags: %% Cell type:code id:integrated-spell tags:
``` python ``` python
query = """SELECT wiki_db AS `Wiki`, snapshot, COUNT(*) AS `Raw Number of Records` query = """SELECT wiki_db AS `Wiki`, snapshot, COUNT(*) AS `Raw Number of Records`
FROM gmodena.imagerec FROM gmodena.imagerec
WHERE snapshot='"""+snapshot+"""' WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki_db, snapshot GROUP BY wiki_db, snapshot
ORDER BY wiki_db""" ORDER BY wiki_db"""
raw_total_number_of_records = spark.sql(query).toPandas() raw_total_number_of_records = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:apparent-marble tags: %% Cell type:code id:apparent-marble tags:
``` python ``` python
raw_total_number_of_records raw_total_number_of_records
``` ```
%% Cell type:code id:aquatic-selling tags: %% Cell type:code id:aquatic-selling tags:
``` python ``` python
total_number_of_records = total_number_of_records.rename(columns={"Number of Records": "Final Number of Records"}) total_number_of_records = total_number_of_records.rename(columns={"Number of Records": "Final Number of Records"})
result = pd.merge(raw_total_number_of_records, total_number_of_records, on=["Wiki", "snapshot"]) result = pd.merge(raw_total_number_of_records, total_number_of_records, on=["Wiki", "snapshot"])
``` ```
%% Cell type:code id:supreme-monday tags: %% Cell type:code id:supreme-monday tags:
``` python ``` python
result.to_csv(output_dir+"/"+"Counts of intermediate and final datasets") result.to_csv(output_dir+"/"+"Counts of intermediate and final datasets")
result result
``` ```
%% Cell type:code id:green-intellectual tags: %% Cell type:code id:green-intellectual tags:
``` python ``` python
result.plot(x="Wiki", result.plot(x="Wiki",
y=["Raw Number of Records", "Final Number of Records"], y=["Raw Number of Records", "Final Number of Records"],
title="Comparison of intermediate and final number of records", title="Comparison of intermediate and final number of records",
figsize=(6, 6), figsize=(6, 6),
kind="bar") kind="bar")
``` ```
%% Cell type:markdown id:supposed-nigeria tags: %% Cell type:markdown id:supposed-nigeria tags:
### Number of articles with and without valid "instance of" ### Number of articles with and without valid "instance of"
Todo: Update snapshot and table name to be passed in parameters Todo: Update snapshot and table name to be passed in parameters
%% Cell type:code id:regulation-rental tags: %% Cell type:code id:regulation-rental tags:
``` python ``` python
query = """SELECT wiki_db, snapshot, query = """SELECT wiki_db, snapshot,
COUNT(instance_of) AS with_instance_of, COUNT(instance_of) AS with_instance_of,
SUM(CASE WHEN instance_of IS NULL then 1 ELSE 0 END) AS without_instance_of SUM(CASE WHEN instance_of IS NULL then 1 ELSE 0 END) AS without_instance_of
FROM gmodena.imagerec_parquet FROM gmodena.imagerec_parquet
WHERE snapshot = '2021-01' WHERE snapshot = '2021-01'
GROUP BY wiki_db, snapshot GROUP BY wiki_db, snapshot
ORDER BY wiki_db""" ORDER BY wiki_db"""
instance_of_metrics = spark.sql(query).toPandas() instance_of_metrics = spark.sql(query).toPandas()
``` ```
%% Cell type:code id:offensive-underwear tags: %% Cell type:code id:offensive-underwear tags:
``` python ``` python
instance_of_metrics.to_csv(output_dir+"/"+"Number of articles with and without valid instance_of") instance_of_metrics.to_csv(output_dir+"/"+"Number of articles with and without valid instance_of")
``` ```
%% Cell type:code id:chronic-clothing tags: %% Cell type:code id:chronic-clothing tags:
``` python ``` python
### Number of redirect articles ### Number of redirect articles
Validate that no "page redirects" are present in the dataset. Validate that no "page redirects" are present in the dataset.
``` ```
%% Cell type:code id:taken-ordinary tags: %% Cell type:code id:taken-ordinary tags:
``` python ``` python
query = f""" query = f"""
select im.snapshot, count(*) as page_redirect from {username}.imagerec im select im.snapshot, count(*) as page_redirect from {username}.imagerec im
join wmf_raw.mediawiki_page as mp join wmf_raw.mediawiki_page as mp
where im.wiki_db = mp.wiki_db where im.wiki_db = mp.wiki_db
and cast(im.page_id as string) = cast(mp.page_id as string) and cast(im.page_id as string) = cast(mp.page_id as string)
and im.snapshot = mp.snapshot and im.snapshot = mp.snapshot
and mp.page_is_redirect = 1 and mp.page_is_redirect = 1
and im.wiki_db != '' and im.snapshot >= "{snapshot}" and im.wiki_db != '' and im.snapshot >= "{snapshot}"
group by im.snapshot""" group by im.snapshot"""
page_redirect = spark.sql(query).toPandas() page_redirect = spark.sql(query).toPandas()
page_redirect.to_csv(os.path.join(output_dir, "Page redirects")) page_redirect.to_csv(os.path.join(output_dir, "Page redirects"))
``` ```
%% Cell type:markdown id: tags:
### Number of records filtered out
%% Cell type:code id: tags:
``` python
query = """SELECT wiki, snapshot,
SUM(CASE WHEN is_article_page = True THEN 1 ELSE 0 END ) as "Final number of records",
SUM(CASE WHEN is_article_page = False THEN 1 ELSE 0 END ) as "Number of records filtered out"
FROM {username}.imagerec_prod
GROUP BY wiki, snapshot"""
filtered_out_records = spark.sql(query).toPandas()
filtered_out_records.to_csv(output_dir+"/"+"Number of records filtered out")
```
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
-- --
-- Changelog: -- Changelog:
-- * 2021-03-08: schema and format freeze. -- * 2021-03-08: schema and format freeze.
-- * 2021-03-25: add is_article_page to where clause
-- --
use ${hiveconf:username}; use ${hiveconf:username};
set hivevar:null_value=""; set hivevar:null_value="";
...@@ -26,4 +27,4 @@ select page_id, ...@@ -26,4 +27,4 @@ select page_id,
insertion_ts, insertion_ts,
wiki wiki
from imagerec_prod from imagerec_prod
where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}' where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}' and is_article_page=true
...@@ -16,22 +16,14 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`( ...@@ -16,22 +16,14 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`(
`image_id` string, `image_id` string,
`confidence_rating` string, `confidence_rating` string,
`source` string, `source` string,
`instance_of` string,
`is_article_page` boolean,
`dataset_id` string, `dataset_id` string,
`insertion_ts` float) `insertion_ts` double)
PARTITIONED BY (`wiki` string, `snapshot` string) PARTITIONED BY (`wiki` string, `snapshot` string)
ROW FORMAT SERDE STORED AS PARQUET
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t',
'serialization.null.format'='""')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec_prod/data'; 'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec_prod';
-- Update partition metadata -- Update partition metadata
MSCK REPAIR TABLE `imagerec_prod`; MSCK REPAIR TABLE `imagerec_prod`;
from enum import Enum
class InstancesToFilter(Enum):
YEAR = "Q577"
CALENDARYEAR = "Q3186692"
DISAMBIGUATION = "Q4167410"
LIST = "Q13406463"
@classmethod
def list(cls):
return [p.value for p in InstancesToFilter]
...@@ -3,6 +3,7 @@ from pyspark.sql import Column, DataFrame ...@@ -3,6 +3,7 @@ from pyspark.sql import Column, DataFrame
from pyspark.sql import functions as F from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType from pyspark.sql.types import IntegerType
from schema import RawDataset from schema import RawDataset
from instances_to_filter import InstancesToFilter
import argparse import argparse
import uuid import uuid
...@@ -41,6 +42,14 @@ class ImageRecommendation: ...@@ -41,6 +42,14 @@ class ImageRecommendation:
) )
) )
is_article_page: Column = (
F.when(
F.col("instance_of").isin(InstancesToFilter.list()),
F.lit(False)
)
.otherwise(True)
)
def __init__(self, dataFrame: DataFrame): def __init__(self, dataFrame: DataFrame):
self.dataFrame = dataFrame self.dataFrame = dataFrame
if not dataFrame.schema == RawDataset.schema: if not dataFrame.schema == RawDataset.schema:
...@@ -89,7 +98,9 @@ class ImageRecommendation: ...@@ -89,7 +98,9 @@ class ImageRecommendation:
) )
) )
return with_recommendations.union(without_recommendations).withColumn("instance_of", self.instance_of) return with_recommendations.union(without_recommendations)\
.withColumn("instance_of", self.instance_of)\
.withColumn("is_article_page", self.is_article_page)
def parse_args(): def parse_args():
......
...@@ -111,7 +111,7 @@ echo "Generating production data" ...@@ -111,7 +111,7 @@ echo "Generating production data"
STARTTIME=${SECONDS} STARTTIME=${SECONDS}
hdfs_imagerec_prod=/user/${username}/imagerec_prod/data hdfs_imagerec_prod=/user/${username}/imagerec_prod
spark2-submit --properties-file ${spark_config} --files etl/schema.py etl/transform.py \ spark2-submit --properties-file ${spark_config} --files etl/schema.py etl/transform.py \
--snapshot ${monthly_snapshot} \ --snapshot ${monthly_snapshot} \
--source ${hdfs_imagerec} \ --source ${hdfs_imagerec} \
......
...@@ -17,6 +17,7 @@ def test_etl(raw_data): ...@@ -17,6 +17,7 @@ def test_etl(raw_data):
"image_id", "image_id",
"confidence_rating", "confidence_rating",
"instance_of", "instance_of",
"is_article_page",
"source", "source",
} }
) )
...@@ -59,3 +60,14 @@ def test_etl(raw_data): ...@@ -59,3 +60,14 @@ def test_etl(raw_data):
) )
assert len(rows) == 1 assert len(rows) == 1
assert rows[0]["instance_of"] == expected_instance_of assert rows[0]["instance_of"] == expected_instance_of
# Pages are correctly marked for filtering
expected_page_id = "523523"
filter_out_rows = (
ddf.where(~F.col("is_article_page"))
.select("page_id")
.distinct()
.collect()
)
assert len(filter_out_rows) == 1
assert filter_out_rows[0]["page_id"] == expected_page_id
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment