Commit b305130e authored by Gmodena's avatar Gmodena
Browse files

Merge algorithm with upstream.

parents e44f8b8e dade3b27
venv: requirements.txt
test -d venv || virtualenv --python=$(shell which python3) venv
. venv/bin/activate; pip install -Ur requirements.txt;
test: venv
. venv/bin/activate; pytest --cov etl
import pytest
from etl.transform import RawDataset
@pytest.fixture(scope="session")
def raw_data(spark_session):
return spark_session.createDataFrame(
[
(
"0",
"Q1234",
"44444",
"Some page with suggestions",
'[{"image": "image1.jpg", "rating": 2.0, "note": "image was found in the following Wikis: ruwiki"}]',
"arwiki",
"2020-12",
),
(
"1",
"Q56789",
"55555",
"Some page with no suggestion",
None,
"arwiki",
"2020-12"
)
],
RawDataset.schema,
)
...@@ -24,7 +24,8 @@ ROW FORMAT SERDE ...@@ -24,7 +24,8 @@ ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ( WITH SERDEPROPERTIES (
'field.delim'='\t', 'field.delim'='\t',
'serialization.format'='\t') 'serialization.format'='\t',
'serialization.null.format'='""')
STORED AS INPUTFORMAT STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat' 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT OUTPUTFORMAT
...@@ -32,5 +33,6 @@ OUTPUTFORMAT ...@@ -32,5 +33,6 @@ OUTPUTFORMAT
LOCATION LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec_prod/data'; 'hdfs://analytics-hadoop/user/${hiveconf:username}/imagerec_prod/data';
-- Update partition metadata -- Update partition metadata
MSCK REPAIR TABLE `imagerec_prod`; MSCK REPAIR TABLE `imagerec_prod`;
...@@ -52,8 +52,9 @@ class ImageRecommendation: ...@@ -52,8 +52,9 @@ class ImageRecommendation:
) )
def transform(self) -> DataFrame: def transform(self) -> DataFrame:
return ( with_recommendations = (
self.dataFrame.withColumn( self.dataFrame.where(~F.col("top_candidates").isNull())
.withColumn(
"data", "data",
F.explode( F.explode(
F.from_json("top_candidates", RawDataset.recommendation_schema) F.from_json("top_candidates", RawDataset.recommendation_schema)
...@@ -73,6 +74,22 @@ class ImageRecommendation: ...@@ -73,6 +74,22 @@ class ImageRecommendation:
"source", "source",
) )
) )
without_recommendations = (
self.dataFrame.where(F.col("top_candidates").isNull())
.withColumnRenamed("wiki_db", "wiki")
.withColumn("image_id", F.lit(None))
.withColumn("confidence_rating", F.lit(None))
.withColumn("source", F.lit(None))
.select(
"wiki",
"page_id",
"page_title",
"image_id",
"confidence_rating",
"source",
)
)
return with_recommendations.union(without_recommendations)
if __name__ == "__main__": if __name__ == "__main__":
......
pytest==6.2.2
pytest-spark==0.6.0
pytest-cov==2.10.1
\ No newline at end of file
from etl.transform import RawDataset, ImageRecommendation
def test_etl(raw_data):
assert raw_data.count() == 2
ddf = ImageRecommendation(raw_data).transform()
assert (
len(
set(ddf.columns).difference(
{
"wiki",
"page_id",
"page_title",
"image_id",
"confidence_rating",
"source",
}
)
)
== 0
)
expected_num_records = 2
assert ddf.count() == expected_num_records
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment