Commit 7712d9f4 authored by Clarakosi's avatar Clarakosi Committed by GitHub
Browse files

Implement parsing of “instance of” fields in ImageMatching production datasets (#9)

* Update transform.py to parse "instance of" json blob

* Update tests and fix transform.py schema changes

* Simplify parsing logic, add metrics, and update tests

* Updates based on code review
parent 292c864a
......@@ -12,7 +12,7 @@ def raw_data(spark_session):
"44444",
"Some page with suggestions",
'[{"image": "image1.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}]',
"",
None,
"arwiki",
"2020-12",
),
......@@ -22,7 +22,7 @@ def raw_data(spark_session):
"55555",
"Some page with no suggestion",
None,
"",
None,
"arwiki",
"2020-12",
),
......@@ -32,11 +32,12 @@ def raw_data(spark_session):
"523523",
"Some page with 3 suggestions",
'['
' {"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}, '
'{"image": "image2.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}, '
'{"image": "image3.jpg", "rating": 1, "note": "image was in the Wikidata item"}, '
'{"image": "image4.jpg", "rating": 3.0, "note": "image was found in the Commons category linked in the Wikidata item"}'
'{"image": "image4.jpg", "rating": 3.0, "note": "image was found in the Commons category linked in '
'the Wikidata item"} '
']',
"",
'{"entity-type":"item","numeric-id":577,"id":"Q577"}',
"enwiki",
"2020-12",
),
......
%% Cell type:code id:requested-karaoke tags:
%% Cell type:code id:impressed-fourth tags:
``` python
import pyspark.sql
import pandas as pd
```
%% Cell type:code id:provincial-southeast tags:parameters
%% Cell type:code id:deluxe-mailman tags:parameters
``` python
# Create output directory
output_dir = "Data_Metrics_Output"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
snapshot = "2021-01"
```
%% Cell type:markdown id:incorporate-registration tags:
%% Cell type:markdown id:improving-jonathan tags:
### Total number of records (per wiki)
%% Cell type:code id:ranking-gibraltar tags:
%% Cell type:code id:engaged-inflation tags:
``` python
query = """SELECT wiki AS Wiki, snapshot, COUNT(*) as `Number of Records`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, snapshot
ORDER BY wiki"""
total_number_of_records = spark.sql(query).toPandas()
```
%% Cell type:code id:dangerous-conservative tags:
%% Cell type:code id:lucky-vocabulary tags:
``` python
total_number_of_records
```
%% Cell type:code id:standard-special tags:
%% Cell type:code id:activated-worker tags:
``` python
total_number_of_records.to_csv(output_dir+"/"+"Total number of records")
```
%% Cell type:markdown id:romance-superintendent tags:
%% Cell type:markdown id:intimate-penny tags:
### Population statistics
%% Cell type:code id:freelance-florence tags:
%% Cell type:code id:arabic-casting tags:
``` python
population_stat = total_number_of_records['Number of Records'].describe()
population_stat.to_csv(output_dir+"/"+"Population statistics")
population_stat
```
%% Cell type:code id:hispanic-standard tags:
%% Cell type:code id:friendly-leonard tags:
``` python
total_number_of_records.boxplot(column=['Number of Records'])
```
%% Cell type:code id:patent-scale tags:
%% Cell type:code id:loose-throw tags:
``` python
pop_stat_median = pd.DataFrame(data={"Median": [total_number_of_records["Number of Records"].median()]})
pop_stat_median.to_csv(output_dir+"/"+"Population statistics median")
pop_stat_median
```
%% Cell type:code id:metropolitan-keeping tags:
%% Cell type:code id:neither-coating tags:
``` python
pop_stat_mode = total_number_of_records['Number of Records'].mode()
pop_stat_mode.to_csv(output_dir+"/"+"Population statistics mode")
pop_stat_mode
```
%% Cell type:markdown id:middle-hamilton tags:
%% Cell type:markdown id:banner-criticism tags:
### Total number of images per page
%% Cell type:code id:distinguished-stranger tags:
%% Cell type:code id:lesbian-angel tags:
``` python
query = """SELECT wiki AS Wiki, page_id as `Page ID`, COUNT(*) as `Number of Images`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
ORDER BY wiki, page_id"""
total_number_of_images_per_page = spark.sql(query).toPandas()
```
%% Cell type:code id:adopted-mexican tags:
%% Cell type:code id:polar-click tags:
``` python
total_number_of_images_per_page.to_csv(output_dir+"/"+"Total number of images per page")
total_number_of_images_per_page
```
%% Cell type:markdown id:fifty-motel tags:
%% Cell type:markdown id:front-ratio tags:
#### Breakdown of the number of images being suggested for each page
%% Cell type:markdown id:deluxe-father tags:
%% Cell type:markdown id:awful-stuart tags:
Keep in mind that pages without an image suggestion will apear as 1.
%% Cell type:code id:accomplished-leather tags:
%% Cell type:code id:neither-emphasis tags:
``` python
query = """SELECT number_of_images AS `Image Suggestions`, count(*) AS `Pages`
FROM (
SELECT wiki, page_id, COUNT(*) as number_of_images
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
) AS expr_qry
GROUP BY number_of_images
ORDER BY number_of_images"""
breakdown_of_image_sug_per_page = spark.sql(query).toPandas()
```
%% Cell type:code id:undefined-childhood tags:
%% Cell type:code id:assisted-startup tags:
``` python
breakdown_of_image_sug_per_page.set_index('Image Suggestions', inplace=True)
breakdown_of_image_sug_per_page.to_csv(output_dir+"/"+"Breakdown of image sug per page")
breakdown_of_image_sug_per_page
```
%% Cell type:code id:dynamic-jacket tags:
%% Cell type:code id:complicated-delay tags:
``` python
breakdown_of_image_sug_per_page.plot(y="Pages",
title="Breakdown of Images Suggestion Per Page",
autopct="%.2f",
figsize=(6, 6),
kind="pie");
```
%% Cell type:markdown id:excessive-intelligence tags:
%% Cell type:markdown id:downtown-manner tags:
Breakdown of image suggestion data by confidence rating.
A rating of None indicates that the page has no image suggestion
%% Cell type:code id:filled-dutch tags:
%% Cell type:code id:generic-priority tags:
``` python
query = """SELECT wiki AS Wiki, confidence_rating AS `Confidence Rating`, COUNT(*) AS `Image Suggestions`
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY Wiki, `Confidence Rating`
ORDER BY Wiki, `Confidence Rating`"""
breakdown_of_image_sug_by_confidence_score = spark.sql(query).toPandas()
```
%% Cell type:code id:effective-thomson tags:
%% Cell type:code id:impressive-failure tags:
``` python
breakdown_of_image_sug_by_confidence_score.to_csv(output_dir+"/"+"Breakdown of image sug by conf rating")
breakdown_of_image_sug_by_confidence_score
```
%% Cell type:markdown id:cultural-defeat tags:
%% Cell type:markdown id:executive-theory tags:
#### Get articles with more than 3 image suggestions
Assuming no error this table should be empty
%% Cell type:code id:compressed-brooks tags:
%% Cell type:code id:fiscal-poverty tags:
``` python
query = """WITH large_image_sug AS
(SELECT wiki, page_id, COUNT(*)
FROM gmodena.imagerec_prod
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki, page_id
HAVING COUNT(*) > 3)
SELECT p.*
FROM gmodena.imagerec_prod p
JOIN large_image_sug
ON large_image_sug.wiki = p.wiki
AND large_image_sug.page_id = p.page_id
AND p.snapshot='"""+snapshot+"""'
ORDER BY p.wiki, p.page_id, p.image_id"""
articles_with_more_image_sug = spark.sql(query).toPandas()
```
%% Cell type:code id:happy-navigator tags:
%% Cell type:code id:metallic-visibility tags:
``` python
articles_with_more_image_sug.to_csv(output_dir+"/"+"Articles with more than 3 sug")
articles_with_more_image_sug
```
%% Cell type:markdown id:dental-bennett tags:
%% Cell type:markdown id:invalid-trader tags:
### Size and counts of intermediate and final datasets
%% Cell type:code id:complete-glossary tags:
%% Cell type:code id:integrated-spell tags:
``` python
query = """SELECT wiki_db AS `Wiki`, snapshot, COUNT(*) AS `Raw Number of Records`
FROM gmodena.imagerec
WHERE snapshot='"""+snapshot+"""'
GROUP BY wiki_db, snapshot
ORDER BY wiki_db"""
raw_total_number_of_records = spark.sql(query).toPandas()
```
%% Cell type:code id:numerical-bryan tags:
%% Cell type:code id:apparent-marble tags:
``` python
raw_total_number_of_records
```
%% Cell type:code id:packed-counter tags:
%% Cell type:code id:aquatic-selling tags:
``` python
total_number_of_records = total_number_of_records.rename(columns={"Number of Records": "Final Number of Records"})
result = pd.merge(raw_total_number_of_records, total_number_of_records, on=["Wiki", "snapshot"])
```
%% Cell type:code id:instrumental-species tags:
%% Cell type:code id:supreme-monday tags:
``` python
result.to_csv(output_dir+"/"+"Counts of intermediate and final datasets")
result
```
%% Cell type:code id:modern-productivity tags:
%% Cell type:code id:green-intellectual tags:
``` python
result.plot(x="Wiki",
y=["Raw Number of Records", "Final Number of Records"],
title="Comparison of intermediate and final number of records",
figsize=(6, 6),
kind="bar")
```
%% Cell type:code id:worse-fleece tags:
%% Cell type:markdown id:supposed-nigeria tags:
### Number of articles with and without valid "instance of"
Todo: Update snapshot and table name to be passed in parameters
%% Cell type:code id:regulation-rental tags:
``` python
query = """SELECT wiki_db, snapshot,
COUNT(instance_of) AS with_instance_of,
SUM(CASE WHEN instance_of IS NULL then 1 ELSE 0 END) AS without_instance_of
FROM gmodena.imagerec_parquet
WHERE snapshot = '2021-01'
GROUP BY wiki_db, snapshot
ORDER BY wiki_db"""
instance_of_metrics = spark.sql(query).toPandas()
```
%% Cell type:code id:offensive-underwear tags:
``` python
instance_of_metrics.to_csv(output_dir+"/"+"Number of articles with and without valid instance_of")
```
......
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import Column, DataFrame
from pyspark.sql import functions as F
from schema import CsvDataset
import argparse
import sys
import uuid
import datetime
spark = SparkSession.builder.getOrCreate()
......@@ -46,4 +41,3 @@ if __name__ == "__main__":
).parquet(
destination
) # Requires dynamic partitioning enabled
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.types import StructType, StringType
class CsvDataset:
......@@ -17,4 +17,5 @@ class RawDataset(CsvDataset):
schema = CsvDataset.schema.add("wiki_db", StringType(), True).add(
"snapshot", StringType(), True
)
recommendation_schema = "array<struct<image:string,note:string,rating:double>>"
top_candidates_schema = "array<struct<image:string,note:string,rating:double>>"
instance_of_schema = "struct<`entity-type`:string,`numeric-id`:bigint,id:string>"
from pyspark.sql import SparkSession
from pyspark.sql import Column, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.types import IntegerType
from schema import RawDataset
import argparse
......@@ -11,8 +11,6 @@ import datetime
spark = SparkSession.builder.getOrCreate()
class ImageRecommendation:
confidence_rating: Column = (
F.when(F.col("rating").cast(IntegerType()) == 1, F.lit("high"))
......@@ -33,6 +31,16 @@ class ImageRecommendation:
)
)
instance_of: Column = (
F.when(
F.col("instance_of").isNull(),
F.lit(None)
)
.otherwise(
F.from_json("instance_of", RawDataset.instance_of_schema).getItem("id")
)
)
def __init__(self, dataFrame: DataFrame):
self.dataFrame = dataFrame
if not dataFrame.schema == RawDataset.schema:
......@@ -46,7 +54,7 @@ class ImageRecommendation:
.withColumn(
"data",
F.explode(
F.from_json("top_candidates", RawDataset.recommendation_schema)
F.from_json("top_candidates", RawDataset.top_candidates_schema)
),
)
.select("*", "data.image", "data.rating", "data.note")
......@@ -61,6 +69,7 @@ class ImageRecommendation:
"image_id",
"confidence_rating",
"source",
"instance_of",
)
)
without_recommendations = (
......@@ -76,16 +85,20 @@ class ImageRecommendation:
"image_id",
"confidence_rating",
"source",
"instance_of",
)
)
return with_recommendations.union(without_recommendations)
return with_recommendations.union(without_recommendations).withColumn("instance_of", self.instance_of)
def parse_args():
parser = argparse.ArgumentParser(description='Transform raw algo output to production datasets')
parser.add_argument('--snapshot', help='Montlhy snapshot date (YYYY-MM)')
parser.add_argument('--source', help='Source dataset path')
parser.add_argument('--destination', help='Destination path')
parser.add_argument('--dataset-id', help='Production dataset identifier (optional)', default=str(uuid.uuid4()), dest='dataset_id')
parser.add_argument('--dataset-id', help='Production dataset identifier (optional)', default=str(uuid.uuid4()),
dest='dataset_id')
return parser.parse_args()
......@@ -101,7 +114,9 @@ if __name__ == "__main__":
num_partitions = 1
df = (
spark.read.parquet(source)
spark.read
.schema(RawDataset.schema)
.parquet(source)
)
insertion_ts = datetime.datetime.now().timestamp()
(
......@@ -112,7 +127,8 @@ if __name__ == "__main__":
.withColumn("snapshot", F.lit(snapshot))
.sort(F.desc("page_title"))
.coalesce(num_partitions)
.write
.partitionBy("wiki", "snapshot")
.mode('overwrite') # Requires dynamic partitioning enabled
.csv(destination)
.parquet(destination)
)
......@@ -16,6 +16,7 @@ def test_etl(raw_data):
"page_title",
"image_id",
"confidence_rating",
"instance_of",
"source",
}
)
......@@ -47,3 +48,14 @@ def test_etl(raw_data):
.count()
== 0
)
# Instance_of json is correctly parsed
expected_instance_of = "Q577"
rows = (
ddf.where(F.col("instance_of") != "null")
.select("instance_of")
.distinct()
.collect()
)
assert len(rows) == 1
assert rows[0]["instance_of"] == expected_instance_of