Skip to content

fr_pending_hourly

KCVelaga requested to merge kcvelaga/data-pipelines:fr_pending into main

Bug: T362615

The MR adds two file

  • Table creation: create_fr_pending_hourly_table.hql (tested)
  • Hourly calculation query: calculate_fr_pending_hourly.sql
    • This is slightly different from the recommended testing, as this will be run using MySQL
    • Deletion / insertion will be part of the DAG script.
    • Here is an example code snippet of how the query will be used within a DAG script (which is inspired from this approach)
for wiki_db in ['dewiki', 'enwiki', 'idwiki', 'ruwiki']:
    
    (host, port) = get_mariadb_host_port_for_wikidb(wiki_db)
    
    fr_pending_jdbc = (
        spark_session
        .read
        .format('jdbc')
        .option("driver", "com.mysql.cj.jdbc.Driver")
        .option('numPartitions', 1)
        .option("url", f"jdbc:mysql://{host}:{port}/{wiki_db}")
        .option("query", query.format(source_table='flaggedpage_pending', wiki_db=wiki_db))
        .option("user", "research")
        .option("password", pw)
        .load()          
    )
    
    fr_pending_jdbc = sparkDF_convert_dtypes(fr_pending_jdbc)
    
    partition_values = fr_pending_jdbc.select('wiki_db', 'date', 'hour').distinct().collect()[0]
    
    spark_session.sql(f"""
        DELETE FROM kcvelaga.fr_pending_hourly
        WHERE
            date = TO_DATE('{partition_values['date'].strftime('%Y-%m-%d')}', 'yyyy-MM-dd')
            AND hour  = {partition_values['hour']}
            AND wiki_db = '{partition_values['wiki_db']}'
    """)
    
    (
        fr_pending_jdbc
        .write
        .format('iceberg')
        .mode('append')
        .saveAsTable('kcvelaga.fr_pending_hourly')
    )
  • I have tested the query, and the results are available at kcvelaga.fr_pending_hourly

Merge request reports