fr_pending_hourly
Bug: T362615
The MR adds two file
- Table creation:
create_fr_pending_hourly_table.hql
(tested) - Hourly calculation query:
calculate_fr_pending_hourly.sql
- This is slightly different from the recommended testing, as this will be run using MySQL
- Deletion / insertion will be part of the DAG script.
- Here is an example code snippet of how the query will be used within a DAG script (which is inspired from this approach)
for wiki_db in ['dewiki', 'enwiki', 'idwiki', 'ruwiki']:
(host, port) = get_mariadb_host_port_for_wikidb(wiki_db)
fr_pending_jdbc = (
spark_session
.read
.format('jdbc')
.option("driver", "com.mysql.cj.jdbc.Driver")
.option('numPartitions', 1)
.option("url", f"jdbc:mysql://{host}:{port}/{wiki_db}")
.option("query", query.format(source_table='flaggedpage_pending', wiki_db=wiki_db))
.option("user", "research")
.option("password", pw)
.load()
)
fr_pending_jdbc = sparkDF_convert_dtypes(fr_pending_jdbc)
partition_values = fr_pending_jdbc.select('wiki_db', 'date', 'hour').distinct().collect()[0]
spark_session.sql(f"""
DELETE FROM kcvelaga.fr_pending_hourly
WHERE
date = TO_DATE('{partition_values['date'].strftime('%Y-%m-%d')}', 'yyyy-MM-dd')
AND hour = {partition_values['hour']}
AND wiki_db = '{partition_values['wiki_db']}'
""")
(
fr_pending_jdbc
.write
.format('iceberg')
.mode('append')
.saveAsTable('kcvelaga.fr_pending_hourly')
)
- I have tested the query, and the results are available at
kcvelaga.fr_pending_hourly