Commit d34ff8ac authored by Ai-Jou Chou's avatar Ai-Jou Chou
Browse files

Update extract_pageviews

parent 4aa93e60
Pipeline #4241 passed with stages
in 3 minutes and 27 seconds
......@@ -5,8 +5,7 @@ import pandas as pd # type: ignore
from itertools import chain
from knowledge_gaps.util import get_table_as_df
from article_quality.app import get_quality_scores
# TODO Aiko, add this file
# from knowledge_gaps.pageviews import extract_pageviews
from knowledge_gaps.pageviews import extract_pageviews
def get_content_feature(spark, content_feature_table, time_buckets):
......@@ -213,9 +212,6 @@ if __name__ == '__main__':
default='2012-01')
parser.add_argument('--end',
default='2022-01')
parser.add_argument('--gap',
default='gender',
help='gender, sexual_orientation, region, country_code, continent, sub_continent, people, all')
parser.add_argument('--content_feature_table')
parser.add_argument('--pageview_table')
parser.add_argument('--quality_score_table')
......
import pyspark.sql.functions as F
def extract_pageviews(spark, start_date, end_date,
projects=None, table="wmf.pageview_hourly"):
"""Extract the number of pageviews between START_DATE and END_DATE
for PROJECTS. Supply a smaller TABLE for faster queries during
development.
Parameters
----------
spark : SparkSession
start_date : datetime.datetime
Start date for counting pageviews.
end_date : datetime.datetime
End date for counting pageviews.
projects : List[str]
List of Wikipedia project names in the format 'enwiki'. If not
supplied, all projects are queried.
table : str
Table (namespaced with the database name) to query. Comes in
handy during development when fast turnaround is important.
Returns
-------
pyspark.sql.dataframe.DataFrame
root
|-- wiki_db: string (nullable = true)
|-- page_title: string (nullable = true)
|-- page_id: integer (nullable = true)
|-- pageviews: long (nullable = true)
"""
start_date = start_date.strftime('%Y-%m-%d')
end_date = end_date.strftime('%Y-%m-%d')
query = f"""
SELECT
concat(split(project, '[\.]')[0], 'wiki') as wiki_db,
page_title,
page_id,
sum(view_count) as pageviews
FROM
{table}
WHERE
to_date(concat(year, '-', month, '-', day))
BETWEEN
to_date('{start_date}')
AND
to_date('{end_date}')
GROUP BY
project, page_title, page_id, year, month, day
"""
df = spark.sql(query)
if projects:
df = df.where(F.col('wiki_db').isin(projects))
return df
#def extract_pageviews(spark, start_date, end_date,
# projects=None, table="wmf.pageview_hourly"):
# """Extract the number of pageviews between START_DATE and END_DATE
# for PROJECTS. Supply a smaller TABLE for faster queries during
# development.
#
# Parameters
# ----------
# spark : SparkSession
#
# start_date : datetime.datetime
# Start date for counting pageviews.
#
# end_date : datetime.datetime
# End date for counting pageviews.
#
# projects : List[str]
# List of Wikipedia project names in the format 'enwiki'. If not
# supplied, all projects are queried.
#
# table : str
# Table (namespaced with the database name) to query. Comes in
# handy during development when fast turnaround is important.
#
# Returns
# -------
# pyspark.sql.dataframe.DataFrame
# root
# |-- wiki_db: string (nullable = true)
# |-- page_title: string (nullable = true)
# |-- page_id: integer (nullable = true)
# |-- pageviews: long (nullable = true)
#
# """
# start_date = start_date.strftime('%Y-%m-%d')
# end_date = end_date.strftime('%Y-%m-%d')
# query = f"""
# SELECT
# concat(split(project, '[\.]')[0], 'wiki') as wiki_db,
# page_title,
# page_id,
# sum(view_count) as pageviews
# FROM
# {table}
# WHERE
# to_date(concat(year, '-', month, '-', day))
# BETWEEN
# to_date('{start_date}')
# AND
# to_date('{end_date}')
# GROUP BY
# project, page_title, page_id, year, month, day
# """
# df = spark.sql(query)
# if projects:
# df = df.where(F.col('wiki_db').isin(projects))
# return df
def extract_pageviews(spark, time_buckets, projects):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment