Commit 0fe8e0ee authored by Gmodena's avatar Gmodena Committed by GitHub
Browse files

T277776 add found on wiki (#13)

* Extract a list of wikis from the note column.

* Fix missing note record mock

* store imagerec_prod as parquet

* Add found_on column to prod dataset

* Remove white spaces from found_on entries

* Fix. reformat style

* Add validation and EDA on found_on column

* Store the output of hive locally.

`hive -f` output contains some Parquet log noise,
that is written to stdout and was redirected to
the dataset.

The export query and dataset generation logic have
been modified to save data locally, without stdout
redirection of the query result set.

* Gracefully stop spark session before exit etl scripts.

* Gracefully stop spark session before exit etl scripts.

* Fix. notebook json post-merge clutter

* Fix metrics notebook and merge with main.

* Clear notebook output

* Fix duplicated field in ddl

* Add EOL to hive queries

* Add termination after create ddl
parent e37946cc
%% Cell type:code id: tags:
``` python
import re
import pickle
import pandas as pd
import math
import numpy as np
import random
import requests
#from bs4 import BeautifulSoup
import json
import os
from wmfdata.spark import get_session
```
%% Cell type:code id: tags:
``` python
!which python
```
%% Cell type:code id: tags:
``` python
qids_and_properties={}
```
%% Cell type:code id: tags:parameters
``` python
# Pass in directory to place output files
output_dir = 'Output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Pass in the full snapshot date
snapshot = '2020-12-28'
# Allow the passing of a single language as a parameter
language = 'arwiki'
# A spark session type determines the resource pool
# to initialise on yarn
spark_session_type = 'regular'
```
%% Cell type:code id: tags:
``` python
# We use wmfdata boilerplate to init a spark session.
# Under the hood the library uses findspark to initialise
# Spark's environment. pyspark imports will be available
# after initialisation
spark = get_session(type='regular', app_name="ImageRec-DEV Training")
import pyspark
import pyspark.sql
```
%% Cell type:code id: tags:
``` python
languages=['enwiki','arwiki','kowiki','cswiki','viwiki','frwiki','fawiki','ptwiki','ruwiki','trwiki','plwiki','hewiki','svwiki','ukwiki','huwiki','hywiki','srwiki','euwiki','arzwiki','cebwiki','dewiki','bnwiki'] #language editions to consider
#val=100 #threshold above which we consider images as non-icons
languages=[language]
```
%% Cell type:code id: tags:
``` python
reg = r'^([\w]+-[\w]+)'
short_snapshot = re.match(reg, snapshot).group()
short_snapshot
```
%% Cell type:code id: tags:
``` python
!ls /home/gmodena/ImageMatching/conf/metrics.properties
```
%% Cell type:code id: tags:
``` python
len(languages)
```
%% Cell type:code id: tags:
``` python
def get_threshold(wiki_size):
#change th to optimize precision vs recall. recommended val for accuracy = 5
sze, th, lim = 50000, 15, 4
if (wiki_size >= sze):
#if wiki_size > base size, scale threshold by (log of ws/bs) + 1
return (math.log(wiki_size/sze, 10)+1)*th
#else scale th down by ratio bs/ws, w min possible val of th = th/limiting val
return max((wiki_size/sze) * th, th/lim)
```
%% Cell type:code id: tags:
``` python
val={}
total={}
for wiki in languages:
querytot="""SELECT COUNT(*) as c
FROM wmf_raw.mediawiki_page
WHERE page_namespace=0
AND page_is_redirect=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'"""
wikisize = spark.sql(querytot).toPandas()
val[wiki]=get_threshold(int(wikisize['c']))
total[wiki]=int(wikisize['c'])
```
%% Cell type:code id: tags:
``` python
val
```
%% Cell type:code id: tags:
``` python
total
```
%% Cell type:code id: tags:
``` python
wikisize
```
%% Cell type:markdown id: tags:
The query below retrieves, for each unillustrated article: its Wikidata ID, the image of the Wikidata ID (if any), the Commons category of the Wikidata ID (if any), and the lead images of the articles in other languages (if any).
`allowed_images` contains the list of icons (images appearing in more than `val` articles)
`image_pageids` contains the list of illustrated articles (articles with images that are not icons)
`noimage_pages` contains the pageid and Qid of unillustrated articles
`qid_props` contains for each Qid in `noimage_pages`, the values of the following properties, when present:
* P18: the item's image
* P373: the item's Commons category
* P31: the item's "instance of" property
`category_image_list` contains the list of all images in a Commons category in `qid_props`
`lan_page_images` contains the list of lead images in Wikipedia articles in all languages linked to each Qid
`qid_props_with_image_list` is qid_props plus the list of images in the Commons category linked to the Wikidata item
%% Cell type:code id: tags:
``` python
for wiki in languages:
print(wiki)
queryd="""WITH allowed_images AS
(
SELECT il_to
FROM wmf_raw.mediawiki_imagelinks
WHERE il_from_namespace=0
AND snapshot='"""+short_snapshot+"""'
AND wiki_db='"""+wiki+"""'
AND il_to not like '%\"%' AND il_to not like '%,%'
GROUP BY il_to
HAVING COUNT(il_to)>"""+str(val[wiki])+"""),
image_pageids AS
(SELECT DISTINCT il_from as pageid
FROM wmf_raw.mediawiki_imagelinks il1
LEFT ANTI JOIN allowed_images
ON allowed_images.il_to=il1.il_to
WHERE il1.il_from_namespace=0
AND il1.wiki_db='"""+wiki+"""'
AND il1.snapshot='"""+short_snapshot+"""'
),
pageimage_pageids AS
(
SELECT DISTINCT pp_page as pageid
FROM wmf_raw.mediawiki_page_props pp
WHERE pp.wiki_db ='"""+wiki+"""'
AND pp.snapshot='"""+short_snapshot+"""'
AND pp_propname in ('page_image','page_image_free')),
all_image_pageids as(
SELECT pageid
FROM image_pageids
UNION
SELECT pageid
FROM pageimage_pageids
),
noimage_pages as
(
SELECT wipl.item_id,p.page_id,p.page_title,page_len
FROM wmf_raw.mediawiki_page p
JOIN wmf.wikidata_item_page_link wipl
ON p.page_id=wipl.page_id
LEFT ANTI JOIN all_image_pageids
on all_image_pageids.pageid=wipl.page_id
WHERE p.page_namespace=0
AND page_is_redirect=0 AND p.wiki_db='"""+wiki+"""'
AND p.snapshot='"""+short_snapshot+"""'
AND wipl.snapshot='"""+snapshot+"""'
AND wipl.page_namespace=0
AND wipl.wiki_db='"""+wiki+"""'
ORDER BY page_len desc
),
qid_props AS
(
SELECT we.id,label_val,
MAX(CASE WHEN claim.mainSnak.property = 'P18' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS hasimage,
MAX(CASE WHEN claim.mainSnak.property = 'P373' THEN REPLACE(REPLACE(claim.mainSnak.datavalue.value,'\"',''),' ','_') ELSE NULL END) AS commonscategory,
MAX(CASE WHEN claim.mainSnak.property = 'P31' THEN claim.mainSnak.datavalue.value ELSE NULL END) AS instanceof
FROM wmf.wikidata_entity we
JOIN noimage_pages
ON we.id=noimage_pages.item_id
LATERAL VIEW explode(labels) t AS label_lang,label_val
LATERAL VIEW OUTER explode(claims) c AS claim
WHERE t.label_lang='en'
AND typ='item'
AND snapshot='"""+snapshot+"""'
AND claim.mainSnak.property in ('P18','P31','P373')
GROUP BY id,label_val
),
category_image_list AS
(
SELECT cl_to,concat_ws(';',collect_list(mp.page_title)) as category_imagelist
from qid_props
left join wmf_raw.mediawiki_categorylinks mc
on qid_props.commonscategory=mc.cl_to
join wmf_raw.mediawiki_page mp
on mp.page_id=mc.cl_from
WHERE mp.wiki_db ='commonswiki'
AND mp.snapshot='"""+short_snapshot+"""'
AND mp.page_namespace=6
AND mp.page_is_redirect=0
AND mc.snapshot='"""+short_snapshot+"""'
AND mc.wiki_db ='commonswiki'
AND mc.cl_type='file'
group by mc.cl_to
),
qid_props_with_image_list AS
(
SELECT id, label_val, hasimage, commonscategory, instanceof,category_imagelist
from qid_props
left join category_image_list
on qid_props.commonscategory=category_image_list.cl_to
),
lan_page_images AS
(
SELECT nip.item_id,nip.page_id,nip.page_title,nip.page_len,collect_list(concat(pp.wiki_db,': ',pp.pp_value)) as lan_images
FROM noimage_pages nip
LEFT JOIN wmf.wikidata_item_page_link wipl
LEFT JOIN wmf_raw.mediawiki_page_props pp
LEFT JOIN wmf_raw.mediawiki_page mp
ON nip.item_id=wipl.item_id
AND wipl.page_id=pp.pp_page
AND wipl.wiki_db=pp.wiki_db
AND mp.page_title=pp.pp_value
WHERE wipl.wiki_db !='"""+wiki+"""'
AND wipl.snapshot='"""+snapshot+"""'
AND wipl.page_namespace=0
AND pp.snapshot='"""+short_snapshot+"""'
AND pp_propname in ('page_image','page_image_free')
AND mp.wiki_db ='commonswiki'
AND mp.snapshot='"""+short_snapshot+"""'
AND mp.page_namespace=6
AND mp.page_is_redirect=0
GROUP BY nip.item_id,nip.page_id,nip.page_title,nip.page_len
),
joined_lan_page_images AS
(
SELECT nip.item_id,nip.page_id,nip.page_title,nip.page_len, lpi.lan_images
from noimage_pages nip
LEFT JOIN lan_page_images lpi
on nip.item_id=lpi.item_id
)
SELECT * from joined_lan_page_images
LEFT JOIN qid_props_with_image_list
on qid_props_with_image_list.id=joined_lan_page_images.item_id
"""
qid_props = spark.sql(queryd).toPandas()
qids_and_properties[wiki]=qid_props
```
%% Cell type:markdown id: tags:
Below I am just creating different tables according to whether an image is retrieved from a specific source (Wikidata image, Commons Category, or interlingual links)
%% Cell type:code id: tags:
``` python
hasimage={}
commonscategory={}
lanimages={}
allimages={}
for wiki in languages:
print(wiki)
hasimage[wiki]=qids_and_properties[wiki][qids_and_properties[wiki]['hasimage'].astype(str).ne('None')]
commonscategory[wiki]=qids_and_properties[wiki][qids_and_properties[wiki]['category_imagelist'].astype(str).ne('None')]
lanimages[wiki]=qids_and_properties[wiki][qids_and_properties[wiki]['lan_images'].astype(str).ne('None')]
print("number of unillustrated articles: "+str(len(qids_and_properties[wiki])))
print("number of articles items with Wikidata image: "+str(len(hasimage[wiki])))
print("number of articles items with Wikidata Commons Category: "+str(len(commonscategory[wiki])))
print("number of articles items with Language Links: "+str(len(lanimages[wiki])))
####
allimages[wiki]=qids_and_properties[wiki]
```
%% Cell type:markdown id: tags:
Below the two functions to select images depending on the source:
* `select_image_language` takes as input the list of images from articles in multiple languages and selects the one which is used more often across languages (after some major filtering)
* `select_image_category` selects at random one of the images in the Commons category linked to the Wikidata item.
%% Cell type:code id: tags:
``` python
def image_language_checks(iname):
#list of substrings to check for
substring_list=['.svg','flag','noantimage','no_free_image','image_manquante',
'replace_this_image','disambig','regions','map','map','default',
'defaut','falta_imagem_','imageNA','noimage','noenzyimage']
iname=iname.lower()
if any(map(iname.__contains__, substring_list)):
return False
else:
return True
def select_image_language(imagelist):
counts={} #contains counts of image occurrences across languages
languages={} #constains which languages cover a given image
#for each image
for image in imagelist:
data=image.strip().split(' ')#this contains the language and image name data
###
if len(data)==2: #if we actually have 2 fields
iname=data[1].strip()
lan=data[0].strip()[:-1]
###
if iname not in counts: #if this image does not exist in our counts yet, initialize counts
if not image_language_checks(iname): #if the image name is not valid
continue
# urll = 'https://commons.wikimedia.org/wiki/File:'+iname.replace(' ','_')+'?uselang='+language
#page = requests.get(urll)
#if page.status_code == 404:
# print (urll)
# continue
counts[iname]=1
languages[iname]=[]
else:
counts[iname]+=1
languages[iname].append(lan)
return languages
def select_image_category(imagelist):
counts={}
languages={}
data=list(imagelist.strip().split(';'))
data=[d for d in data if d.find('.')!=-1]
return random.choice(data)
```
%% Cell type:markdown id: tags:
Below the priority assignment process:
* If the article has a Wikidata image (not a flag, as this is likely a duplicate), give it priority 1
* Choose up to 3 images among the ones from related Wikipedia articles in other languages, using the `select_image_language` function, and give priority 2.x where `x` is a ranking given by the number of languages using that image
* If the article has an associated Commons category, call the `select_image_category` function, randomly selecting up to 3 images form that category
%% Cell type:code id: tags:
``` python
stats={}
data_small={}
####
for wiki in languages:
selected=[] #stores selected images for each article
notes=[] #stores information about the source where the candidate image was drawn from
wikis=[]
data_small[wiki]=allimages[wiki].sample(len(allimages[wiki]))
language=wiki.replace('wiki','')
#rtl=direction[wiki] #right to left -> rtl; left to right -> ltr
for wikipedia in data_small[wiki]['lan_images']:
if str(wikipedia)!='None':
lg=select_image_language(wikipedia)
if len(lg)==0:
lg=None
wikis.append(lg)
else:
wikis.append(None)
data_small[wiki]['wikipedia_imagelist']=wikis
for wikidata,commons,wikipedia,jdata in zip(data_small[wiki]['hasimage'],data_small[wiki]['category_imagelist'],data_small[wiki]['wikipedia_imagelist'],data_small[wiki]['instanceof']):
if jdata is not None:
qid=json.loads(jdata)["numeric-id"]
if qid in [4167410,577,13406463]:
selected.append(None)
notes.append(None)
continue
image=None
tier={}
note={}
if str(commons)!='None':
for i in range(min(len(list(commons.strip().split(';'))),3)):
image=select_image_category(commons)
tier[image]=3
note[image]='image was found in the Commons category linked in the Wikidata item'
###
if str(wikipedia) !='None':
index=np.argsort([len(l) for l in list(wikipedia.values())])
#print(wikipedia)
for i in range(min(len(wikipedia),3)):
image=list(wikipedia.keys())[index[-(i+1)]]
tier[image]=2+(float(i)/10)
note[image]='image was found in the following Wikis: '+', '.join(wikipedia[image])
if str(wikidata)!='None' and wikidata.lower().find('flag') ==-1:
image=wikidata[1:-1]
tier[image]=1
note[image]='image was in the Wikidata item'
selected.append(tier if len(tier)>0 else None)
notes.append(note if len(note)>0 else None)
# if image is not None:
# properties.append(get_properties(image,language,rtl,page))
# else:
# properties.append([None,None,None,None,None,None,None,None,None])
#updating table
data_small[wiki]['selected']=selected
data_small[wiki]['notes']=notes
data_small[wiki]['good_interlinks']=wikis
#TODO(REMOVE FROM repo) data_small[wiki]=data_small[wiki][data_small[wiki]['selected'].astype(str).ne('None')]
#print("total number of articles: "+str(total[wiki]))
#print("number of unillustrated articles: "+str(len(qids_and_properties[wiki])))
#print("number of articles with at least 1 recommendation: "+str(len(data_small[wiki])))
#stats[wiki]=[total[wiki],len(qids_and_properties[wiki]),len(data_small[wiki]),len(all3images),len(hasimage),len(commonscategory),len(lanimages)]
```
%% Cell type:code id: tags:
``` python
#the final selection process: select up to 3 images per candidateand their relative confidence score (1=high, 2=medium, 3=low)
#based on the priorities assigned earlier
for wiki in languages:
top_candidates=[]
for selected,notes in zip (data_small[wiki]['selected'],data_small[wiki]['notes']):
if selected is not None:
index=np.argsort([l for l in list(selected.values())])
candidates=[]
#print(wikipedia)
for i in range(min(len(index),3)):
image=list(selected.keys())[index[i]]
rating=selected[image]
note=notes[image]
candidates.append({'image':image,'rating':rating,'note':note})
top_candidates.append(candidates)
else:
top_candidates.append(None)
data_small[wiki]['top_candidates']=top_candidates
data_small[wiki][['item_id','page_id','page_title','top_candidates', 'instanceof']].to_csv(output_dir+'/'+wiki+'_'+snapshot+'_wd_image_candidates.tsv',sep='\t')
```
%% Cell type:code id: tags:
``` python
spark.stop()
```
......
import pytest
from etl.transform import RawDataset
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
......@@ -31,12 +33,12 @@ def raw_data(spark_session):
"Q66666",
"523523",
"Some page with 3 suggestions",
'['
'{"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}, '
"["
'{"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki,arwiki,enwiki"}, '
'{"image": "image3.jpg", "rating": 1, "note": "image was in the Wikidata item"}, '
'{"image": "image4.jpg", "rating": 3.0, "note": "image was found in the Commons category linked in '
'the Wikidata item"} '
']',
"]",
'{"entity-type":"item","numeric-id":577,"id":"Q577"}',
"enwiki",
"2020-12",
......@@ -44,3 +46,21 @@ def raw_data(spark_session):
],
RawDataset.schema,
)
@pytest.fixture(scope="session")
def wikis(spark_session: SparkSession) -> DataFrame:
return spark_session.createDataFrame(
[
["image was found in the following Wikis: ruwiki, itwiki,enwiki"],
["image was found in the following Wikis: "],
[None],
],
["note"],
)
def assert_shallow_equals(ddf: DataFrame, other_ddf: DataFrame) -> None:
assert len(set(ddf.columns).difference(set(other_ddf.columns))) == 0
assert ddf.subtract(other_ddf).rdd.isEmpty()
assert other_ddf.subtract(ddf).rdd.isEmpty()
%% Cell type:code id:loaded-atlantic tags:
``` python
%pylab inline
```
%% Cell type:code id:impressed-fourth tags:
``` python
import pyspark.sql
import pandas as pd
import os
import getpass
from pyspark.sql import functions as F
```
%% Cell type:code id:deluxe-mailman tags:parameters
``` python
# Create output directory
output_dir = "Data_Metrics_Output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
snapshot = "2021-01"
username = getpass.getuser()
```
%% Cell type:markdown id:improving-jonathan tags:
### Total number of records (per wiki)
%% Cell type:code id:engaged-inflation tags:
``` python
query = """SELECT wiki AS Wiki, snapshot, COUNT(*) as `Number of Records`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, snapshot
ORDER BY wiki"""
total_number_of_records = spark.sql(query).toPandas()
```
%% Cell type:code id:lucky-vocabulary tags:
``` python
total_number_of_records
```
%% Cell type:code id:activated-worker tags:
``` python
total_number_of_records.to_csv(output_dir+"/"+"Total number of records")
```
%% Cell type:markdown id:intimate-penny tags:
### Population statistics
%% Cell type:code id:arabic-casting tags:
``` python
population_stat = total_number_of_records['Number of Records'].describe()
population_stat.to_csv(output_dir+"/"+"Population statistics")
population_stat
```
%% Cell type:code id:friendly-leonard tags:
``` python
total_number_of_records.boxplot(column=['Number of Records'])
```
%% Cell type:code id:loose-throw tags:
``` python
pop_stat_median = pd.DataFrame(data={"Median": [total_number_of_records["Number of Records"].median()]})
pop_stat_median.to_csv(output_dir+"/"+"Population statistics median")
pop_stat_median
```
%% Cell type:code id:neither-coating tags:
``` python
pop_stat_mode = total_number_of_records['Number of Records'].mode()
pop_stat_mode.to_csv(output_dir+"/"+"Population statistics mode")
pop_stat_mode
```
%% Cell type:markdown id:banner-criticism tags:
### Total number of images per page
%% Cell type:code id:lesbian-angel tags:
``` python
query = """SELECT wiki AS Wiki, page_id as `Page ID`, COUNT(*) as `Number of Images`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
ORDER BY wiki, page_id"""
total_number_of_images_per_page = spark.sql(query).toPandas()
```
%% Cell type:code id:polar-click tags:
``` python
total_number_of_images_per_page.to_csv(output_dir+"/"+"Total number of images per page")
total_number_of_images_per_page
```
%% Cell type:markdown id:front-ratio tags:
#### Breakdown of the number of images being suggested for each page
%% Cell type:markdown id:awful-stuart tags:
Keep in mind that pages without an image suggestion will apear as 1.
%% Cell type:code id:neither-emphasis tags:
``` python
query = """SELECT number_of_images AS `Image Suggestions`, count(*) AS `Pages`
FROM (
SELECT wiki, page_id, COUNT(*) as number_of_images
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
) AS expr_qry
GROUP BY number_of_images
ORDER BY number_of_images"""
breakdown_of_image_sug_per_page = spark.sql(query).toPandas()
```
%% Cell type:code id:assisted-startup tags:
``` python
breakdown_of_image_sug_per_page.set_index('Image Suggestions', inplace=True)
breakdown_of_image_sug_per_page.to_csv(output_dir+"/"+"Breakdown of image sug per page")
breakdown_of_image_sug_per_page
```
%% Cell type:code id:complicated-delay tags:
``` python
breakdown_of_image_sug_per_page.plot(y="Pages",
title="Breakdown of Images Suggestion Per Page",
autopct="%.2f",
figsize=(6, 6),
kind="pie");
```
%% Cell type:markdown id:downtown-manner tags:
Breakdown of image suggestion data by confidence rating.
A rating of None indicates that the page has no image suggestion
%% Cell type:code id:generic-priority tags:
``` python
query = """SELECT wiki AS Wiki, confidence_rating AS `Confidence Rating`, COUNT(*) AS `Image Suggestions`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY Wiki, `Confidence Rating`
ORDER BY Wiki, `Confidence Rating`"""
breakdown_of_image_sug_by_confidence_score = spark.sql(query).toPandas()
```
%% Cell type:code id:impressive-failure tags:
``` python
breakdown_of_image_sug_by_confidence_score.to_csv(output_dir+"/"+"Breakdown of image sug by conf rating")
breakdown_of_image_sug_by_confidence_score
```
%% Cell type:markdown id:executive-theory tags:
#### Get articles with more than 3 image suggestions
Assuming no error this table should be empty
%% Cell type:code id:fiscal-poverty tags:
``` python
query = """WITH large_image_sug AS
(SELECT wiki, page_id, COUNT(*)
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
HAVING COUNT(*) > 3)
SELECT p.*
FROM gmodena.imagerec_prod p
JOIN large_image_sug
ON large_image_sug.wiki = p.wiki
AND large_image_sug.page_id = p.page_id
AND p.snapshot='"""+snapshot+"""'
ORDER BY p.wiki, p.page_id, p.image_id"""
articles_with_more_image_sug = spark.sql(query).toPandas()
```
%% Cell type:code id:metallic-visibility tags:
``` python
articles_with_more_image_sug.to_csv(output_dir+"/"+"Articles with more than 3 sug")
articles_with_more_image_sug
```
%% Cell type:markdown id:invalid-trader tags:
### Size and counts of intermediate and final datasets
%% Cell type:code id:integrated-spell tags:
``` python
query = """SELECT wiki_db AS `Wiki`, snapshot, COUNT(*) AS `Raw Number of Records`
FROM gmodena.imagerec
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki_db, snapshot
ORDER BY wiki_db"""
raw_total_number_of_records = spark.sql(query).toPandas()
```
%% Cell type:code id:apparent-marble tags:
``` python
raw_total_number_of_records
```
%% Cell type:code id:aquatic-selling tags:
``` python
total_number_of_records = total_number_of_records.rename(columns={"Number of Records": "Final Number of Records"})
result = pd.merge(raw_total_number_of_records, total_number_of_records, on=["Wiki", "snapshot"])
```
%% Cell type:code id:supreme-monday tags:
``` python
result.to_csv(output_dir+"/"+"Counts of intermediate and final datasets")
result
```
%% Cell type:code id:green-intellectual tags:
``` python
result.plot(x="Wiki",
y=["Raw Number of Records", "Final Number of Records"],
title="Comparison of intermediate and final number of records",
figsize=(6, 6),
kind="bar")
```
%% Cell type:markdown id:supposed-nigeria tags:
### Number of articles with and without valid "instance of"
Todo: Update snapshot and table name to be passed in parameters
%% Cell type:code id:regulation-rental tags:
``` python
query = """SELECT wiki_db, snapshot,
COUNT(instance_of) AS with_instance_of,
SUM(CASE WHEN instance_of IS NULL then 1 ELSE 0 END) AS without_instance_of
FROM gmodena.imagerec_parquet
WHERE snapshot = '2021-01'
GROUP BY wiki_db, snapshot
ORDER BY wiki_db"""
instance_of_metrics = spark.sql(query).toPandas()
```
%% Cell type:code id:offensive-underwear tags:
``` python
instance_of_metrics.to_csv(output_dir+"/"+"Number of articles with and without valid instance_of")
```
%% Cell type:code id:chronic-clothing tags:
``` python
### Number of redirect articles
Validate that no "page redirects" are present in the dataset.
```
%% Cell type:code id:taken-ordinary tags:
``` python
query = f"""
select im.snapshot, count(*) as page_redirect from {username}.imagerec im
join wmf_raw.mediawiki_page as mp
where im.wiki_db = mp.wiki_db
and cast(im.page_id as string) = cast(mp.page_id as string)
and im.snapshot = mp.snapshot
and mp.page_is_redirect = 1
and im.wiki_db != '' and im.snapshot >= "{snapshot}"
group by im.snapshot"""
page_redirect = spark.sql(query).toPandas()
page_redirect.to_csv(os.path.join(output_dir, "Page redirects"))
```
%% Cell type:markdown id: tags:
%% Cell type:markdown id:logical-bedroom tags:
### Number of records filtered out
%% Cell type:code id: tags:
%% Cell type:code id:decreased-scope tags:
``` python
query = """SELECT wiki, snapshot,
SUM(CASE WHEN is_article_page = True THEN 1 ELSE 0 END ) as "Final number of records",
SUM(CASE WHEN is_article_page = False THEN 1 ELSE 0 END ) as "Number of records filtered out"
FROM {username}.imagerec_prod
GROUP BY wiki, snapshot"""
filtered_out_records = spark.sql(query).toPandas()
filtered_out_records.to_csv(output_dir+"/"+"Number of records filtered out")
```
%% Cell type:markdown id:military-parks tags:
### Distribution of found_on wikis
%% Cell type:code id:accepting-customer tags:
``` python
query = f"""SELECT wiki, page_id, size(found_on) as num_languages
FROM {username}.imagerec_prod
WHERE wiki != ''
AND snapshot = '{snapshot}'
AND source='wikipedia'
GROUP BY wiki, page_id, found_on"""
found_on_all_wikis = spark.sql(query)
# Summarize data to a frequency count of num_languages\n",
found_on_freq = found_on_all_wikis.groupby("num_languages").count().toPandas()
found_on_freq.to_csv(os.path.join(output_dir, "found_on frequencies"))
```
%% Cell type:code id:seven-shuttle tags:
``` python
found_on_all_wikis.select('num_languages').describe().show()
```
%% Cell type:code id:professional-anaheim tags:
``` python
found_on_all_wikis.approxQuantile("num_languages", [0.5], 0.25)
```
%% Cell type:markdown id:acting-thing tags:
#### EDA on found_on
%% Cell type:code id:artistic-wrist tags:
``` python
found_on_freq = found_on_all_wikis.groupby("num_languages").count().toPandas()
```
%% Cell type:code id:bronze-stress tags:
``` python
found_on_freq['count'].plot(kind='bar', figsize=(30, 20), title="Number of languages - frequency distribution")
```
%% Cell type:code id:needed-model tags:
``` python
wiki="enwiki"
found_on_metrics = found_on_all_wikis.where(F.col('wiki') == wiki).toPandas()
cond = found_on_metrics["wiki"] == wiki
found_on_metrics[cond].boxplot("num_languages")
```
%% Cell type:code id:hydraulic-farmer tags:
``` python
```
......
......@@ -10,14 +10,20 @@
-- * Field delimiter: "\t"
-- * Null value for missing recommendations
-- (image_id, confidence_rating, source fields): ""
-- * found_on: list of wikis delimited by ','
--
-- Changelog:
-- * 2021-03-08: schema and format freeze.
-- * 2021-03-25: append found_on column
-- * 2021-03-25: add is_article_page to where clause
--
use ${hiveconf:username};
set hivevar:null_value="";
set hivevar:found_on_delimiter=",";
set hive.cli.print.header=true;
insert overwrite local directory '${hiveconf:output_path}'
row format delimited fields terminated by '\t'
select page_id,
page_title,
nvl(image_id, ${null_value}) as image_id,
......@@ -25,6 +31,8 @@ select page_id,
nvl(source, ${null_value}) as source,
dataset_id,
insertion_ts,
wiki
wiki,
concat_ws(${found_on_delimiter}, found_on) as found_on
from imagerec_prod
where wiki = '${hiveconf:wiki}' and snapshot='${hiveconf:snapshot}' and is_article_page=true
......@@ -23,8 +23,8 @@ PARTITIONED BY (
`snapshot` string)
STORED AS PARQUET
LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec'
TBLPROPERTIES ("skip.header.line.count"="1");
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec';
-- Update partition metadata
MSCK REPAIR TABLE `imagerec`;
\ No newline at end of file
MSCK REPAIR TABLE `imagerec`;
......@@ -19,7 +19,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `imagerec_prod`(
`instance_of` string,
`is_article_page` boolean,
`dataset_id` string,
`insertion_ts` double)
`insertion_ts` double,
`found_on` array<string>)
PARTITIONED BY (`wiki` string, `snapshot` string)
STORED AS PARQUET
LOCATION
......@@ -27,3 +28,4 @@ LOCATION
-- Update partition metadata
MSCK REPAIR TABLE `imagerec_prod`;
......@@ -41,3 +41,4 @@ if __name__ == "__main__":
).parquet(
destination
) # Requires dynamic partitioning enabled
spark.stop()
......@@ -32,13 +32,16 @@ class ImageRecommendation:
)
)
instance_of: Column = (
F.when(
F.col("instance_of").isNull(),
F.lit(None)
)
.otherwise(
F.from_json("instance_of", RawDataset.instance_of_schema).getItem("id")
instance_of: Column = F.when(F.col("instance_of").isNull(), F.lit(None)).otherwise(
F.from_json("instance_of", RawDataset.instance_of_schema).getItem("id")
)
found_on: Column = F.when(F.col("note").isNull(), F.lit(None)).otherwise(
F.split(
F.regexp_replace(
F.regexp_extract(F.col("note"), "Wikis:\s+(.*)$", 1), "\s+", ""
),
",",
)
)
......@@ -54,7 +57,7 @@ class ImageRecommendation:
self.dataFrame = dataFrame
if not dataFrame.schema == RawDataset.schema:
raise AttributeError(
f"Invalid schema. Expected '{RawDataset.schema}'. Got '{dataFrame.schema}"
f"Invalid schema. Expected '{RawDataset.schema}'. Got '{dataFrame.schema}"
)
def transform(self) -> DataFrame:
......@@ -71,6 +74,7 @@ class ImageRecommendation:
.withColumnRenamed("image", "image_id")
.withColumn("confidence_rating", self.confidence_rating)
.withColumn("source", self.source)
.withColumn("found_on", self.found_on)
.select(
"wiki",
"page_id",
......@@ -79,6 +83,7 @@ class ImageRecommendation:
"confidence_rating",
"source",
"instance_of",
"found_on",
)
)
without_recommendations = (
......@@ -87,6 +92,7 @@ class ImageRecommendation:
.withColumn("image_id", F.lit(None))
.withColumn("confidence_rating", F.lit(None))
.withColumn("source", F.lit(None))
.withColumn("found_on", F.lit(None))
.select(
"wiki",
"page_id",
......@@ -95,6 +101,7 @@ class ImageRecommendation:
"confidence_rating",
"source",
"instance_of",
"found_on",
)
)
......@@ -104,12 +111,18 @@ class ImageRecommendation:
def parse_args():
parser = argparse.ArgumentParser(description='Transform raw algo output to production datasets')
parser.add_argument('--snapshot', help='Montlhy snapshot date (YYYY-MM)')
parser.add_argument('--source', help='Source dataset path')
parser.add_argument('--destination', help='Destination path')
parser.add_argument('--dataset-id', help='Production dataset identifier (optional)', default=str(uuid.uuid4()),
dest='dataset_id')
parser = argparse.ArgumentParser(
description="Transform raw algo output to production datasets"
)
parser.add_argument("--snapshot", help="Montlhy snapshot date (YYYY-MM)")