Commit 7cb80f12 authored by Clarakosi's avatar Clarakosi Committed by GitHub
Browse files

Add Search table (#27)

* Add columns for production data export to elastic search

* Fix table creation

* Update partitioning

* Add distinct() to search table
parent 2b03bb25
-- DDL to create an external table that exposes the
-- production dataset for the search team.
-- The default HDFS location and Hive database are relative to a developer's.
-- username. Example hdfs://analytics-hadoop/user/clarakosi/search_imagerec/data.
--
-- The dataset will be available at https://superset.wikimedia.org/superset/sqllab via the
-- `presto_analytics` database.
--
-- Execution
-- hive -hiveconf username=<username> -f external_search_imagerec.hql
USE ${hiveconf:username};
CREATE EXTERNAL TABLE IF NOT EXISTS `search_imagerec`(
`wikiid` string,
`page_id` int,
`page_namespace` int,
`recommendation_type` string)
PARTITIONED BY (`year` int, `month` int, `day` int)
STORED AS PARQUET
LOCATION
'hdfs://analytics-hadoop/user/${hiveconf:username}/search_imagerec';
-- Update partition metadata
MSCK REPAIR TABLE `search_imagerec`;
\ No newline at end of file
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import argparse
spark = SparkSession.builder.getOrCreate()
def parse_args():
parser = argparse.ArgumentParser(
description="Transform raw algo output to production datasets"
)
parser.add_argument("--snapshot", help="Monthly snapshot date (YYYY-MM-DD)")
parser.add_argument("--source", help="Source dataset path")
parser.add_argument("--destination", help="Destination path")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
snapshot = args.snapshot.split("-")
source = args.source
destination = args.destination
year = snapshot[0]
month = snapshot[1]
day = snapshot[2]
num_partitions = 1
df = spark.read.parquet(source)
(
df
.where(~F.col("image_id").isNull())
.filter(F.col("is_article_page") == True)
.withColumn("page_namespace", F.lit(0))
.withColumn("recommendation_type", F.lit('image'))
.withColumn("year", F.lit(year))
.withColumn("month", F.lit(month))
.withColumn("day", F.lit(day))
.withColumnRenamed("wiki", "wikiid")
.withColumn("page_id", df.page_id.cast('int'))
.select(
"wikiid",
"page_id",
"page_namespace",
"recommendation_type",
"year",
"month",
"day"
)
.distinct()
.coalesce(num_partitions)
.write.partitionBy("year", "month", "day")
.mode("overwrite") # Requires dynamic partitioning enabled
.parquet(destination)
)
spark.stop()
...@@ -163,5 +163,24 @@ metric_name=metrics.etl.export_prod_data.${snapshot}.seconds ...@@ -163,5 +163,24 @@ metric_name=metrics.etl.export_prod_data.${snapshot}.seconds
timestamp=$(date +%s) timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name} echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
#7. Create search table parquet file
STARTTIME=${SECONDS}
hdfs_search_imagerec=/user/${username}/search_imagerec
spark2-submit --properties-file ${spark_config} etl/search_table.py \
--snapshot ${snapshot} \
--source ${hdfs_imagerec_prod} \
--destination ${hdfs_search_imagerec}
metric_name=metrics.etl.search_table.${snapshot}.second
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
#8 Update hive external table metadata (search_table)
STARTTIME=${SECONDS}
hive -hiveconf username=${username} -f ddl/external_search_imagerec.hql
ENDTIME=${SECONDS}
metric_name=hive.search_imagerec.${snapshot}
timestamp=$(date +%s)
echo "${timestamp},$(($ENDTIME - $STARTTIME))" >> ${metrics_dir}/${metric_name}
echo "Export summary" echo "Export summary"
cut -f 3,4 ${outputdir}/*.tsv | sort -k 1,2 | uniq -c cut -f 3,4 ${outputdir}/*.tsv | sort -k 1,2 | uniq -c
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment