ima.py 7.02 KB
Newer Older
Gmodena's avatar
Gmodena committed
1
2
# Authors
# Clara Andrew-Wani 2021 (https://github.com/clarakosi/ImageMatching/blob/airflow/etl.py).
Gmodena's avatar
Gmodena committed
3
from datetime import timedelta, datetime
Gmodena's avatar
Gmodena committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.papermill_operator import PapermillOperator

import os
import uuid
import getpass
import configparser

default_args = {
    "owner": getpass.getuser(), # User running the job (default_user: airflow)
    "run_as_owner": True,
    "depends_on_past": False,
    "email": ["image-suggestion-owners@wikimedia.org"], # TODO: this is just an example. Set to an existing address
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "start_date": days_ago(1),
26
    "catchup": True,
Gmodena's avatar
Gmodena committed
27
    "schedule_interval": None,
Gmodena's avatar
Gmodena committed
28
29
30
31
32
33
34
35
36
}

with DAG(
    "image-suggestion-etl-pipeline",
    tags=["image-suggestion", "experimental"],
    default_args=default_args,
    concurrency=3
) as dag:

37
    image_suggestion_dir = os.environ.get("IMAGE_SUGGESTION_DIR", f'/srv/airflow-platform_eng/image-matching/')
Gmodena's avatar
Gmodena committed
38
39
    # TODO: Undo hardcode, use airflow generated run id
    run_id = '8419345a-3404-4a7c-93e1-b9e6813706ff'
Gmodena's avatar
Gmodena committed
40
41
    snapshot = "{{ dag_run.conf['snapshot'] or '2021-09-08' }}"
    monthly_snapshot = datetime.fromisoformat(snapshot).strftime('%Y-%m')
Gmodena's avatar
Gmodena committed
42
    username = getpass.getuser()
Gmodena's avatar
Gmodena committed
43
    hive_user_db = 'analytics_platform_eng'
Gmodena's avatar
Gmodena committed
44
    config = configparser.ConfigParser()
Gmodena's avatar
Gmodena committed
45
    ima_home = '/srv/airflow-platform_eng/image-matching'
Gmodena's avatar
Gmodena committed
46
47
48
    # config.read(f'{image_suggestion_dir}/conf/wiki.conf')
    # wikis = config.get("poc_wikis", "target_wikis")
    # wikis = wikis.split()
Gmodena's avatar
Gmodena committed
49
    wikis = ['kowiki', 'plwiki']
Gmodena's avatar
Gmodena committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

    # Create directories for pipeline
    algo_outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/Output')
    outputdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/imagerec_prod_{snapshot}')
    tsv_tmpdir = os.path.join(image_suggestion_dir, f'runs/{run_id}/tmp')

    if not os.path.exists(algo_outputdir):
        os.makedirs(algo_outputdir)

    if not os.path.exists(outputdir):
        os.makedirs(outputdir)

    if not os.path.exists(tsv_tmpdir):
        os.makedirs(tsv_tmpdir)

    # Generate spark config
    spark_config = f'{image_suggestion_dir}/runs/{run_id}/regular.spark.properties'

    generate_spark_config = BashOperator(
        task_id='generate_spark_config',
        bash_command=f'cat {image_suggestion_dir}/conf/spark.properties.template /usr/lib/spark2/conf/spark-defaults.conf > {spark_config}'
    )

    # TODO: Look into SparkSubmitOperator
    generate_placeholder_images = BashOperator(
        task_id='generate_placeholder_images',
Gmodena's avatar
Gmodena committed
76
        bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file /srv/airflow-platform_eng/image-matching/runs/{run_id}/regular.spark.properties --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/placeholder_images.py {snapshot}'
Gmodena's avatar
Gmodena committed
77
78
79
80
81
    )

    # Update hive external table metadata
    update_imagerec_table = BashOperator(
        task_id='update_imagerec_table',
Gmodena's avatar
Gmodena committed
82
        bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec.hql'
Gmodena's avatar
Gmodena committed
83
84
85
86
87
    )



    for wiki in wikis:
88
        algo_run = BashOperator(
Gmodena's avatar
Gmodena committed
89
            task_id=f'run_algorithm_for_{wiki}',
Gmodena's avatar
Gmodena committed
90
            bash_command=f'PYSPARK_PYTHON=./venv/bin/python PYSPARK_DRIVER_PYTHON={ima_home}/venv/bin/python spark2-submit --properties-file {ima_home}/runs/{run_id}/regular.spark.properties --archives {ima_home}/venv.tar.gz#venv {ima_home}/venv/bin/algorithm.py {snapshot} {wiki}'
Gmodena's avatar
Gmodena committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
        )

        # Sensor for finished algo run
        raw_dataset_sensor = FileSensor(
            task_id=f'wait_for_{wiki}_raw_dataset',
            poke_interval=60,
            filepath=os.path.join(
                algo_outputdir, f'{wiki}_{snapshot}_wd_image_candidates.tsv'
            ),
            dag=dag,
        )

        # Upload raw data to HDFS
        hdfs_imagerec = f'/user/{username}/imagerec'
        spark_master_local = 'local[2]'
        upload_imagerec_to_hdfs = BashOperator(
            task_id=f'upload_{wiki}_imagerec_to_hdfs',
            bash_command=f'spark2-submit --properties-file {spark_config} --master {spark_master_local} \
Gmodena's avatar
Gmodena committed
109
110
                            --files {image_suggestion_dir}/spark/schema.py \
                            {image_suggestion_dir}/spark/raw2parquet.py \
Gmodena's avatar
Gmodena committed
111
112
113
114
115
116
117
                            --wiki {wiki} \
                            --snapshot {monthly_snapshot} \
                            --source file://{algo_outputdir}/{wiki}_{snapshot}_wd_image_candidates.tsv \
                            --destination {hdfs_imagerec}/'
        )

        # Link tasks
Gmodena's avatar
Gmodena committed
118
        generate_spark_config >> generate_placeholder_images >> algo_run >> raw_dataset_sensor >> upload_imagerec_to_hdfs >> update_imagerec_table
Gmodena's avatar
Gmodena committed
119
120
121
122
123

    # Generate production data
    hdfs_imagerec_prod = f'/user/{username}/imagerec_prod'
    generate_production_data = BashOperator(
        task_id='generate_production_data',
Gmodena's avatar
Gmodena committed
124
125
        bash_command=f'spark2-submit --properties-file {spark_config} --files {image_suggestion_dir}/spark/schema.py \
                    {image_suggestion_dir}/spark/transform.py \
Gmodena's avatar
Gmodena committed
126
127
128
129
130
131
132
133
134
                    --snapshot {monthly_snapshot} \
                    --source {hdfs_imagerec} \
                    --destination {hdfs_imagerec_prod} \
                    --dataset-id {run_id}'
    )

    # Update hive external production metadata
    update_imagerec_prod_table = BashOperator(
        task_id='update_imagerec_prod_table',
Gmodena's avatar
Gmodena committed
135
        bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -f {image_suggestion_dir}/sql/external_imagerec_prod.hql'
Gmodena's avatar
Gmodena committed
136
137
138
139
140
141
142
    )

    for wiki in wikis:

        # Export production datasets
        export_prod_data = BashOperator(
            task_id=f'export_{wiki}_prod_data',
Gmodena's avatar
Gmodena committed
143
            bash_command=f'hive -hiveconf username={username} -hiveconf database={hive_user_db} -hiveconf output_path={tsv_tmpdir}/{wiki}_{monthly_snapshot} -hiveconf wiki={wiki} -hiveconf snapshot={monthly_snapshot} -f {image_suggestion_dir}/sql/export_prod_data.hql > {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header'
Gmodena's avatar
Gmodena committed
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
        )

        # Sensor for production data
        production_dataset_sensor = FileSensor(
            task_id=f'wait_for_{wiki}_production_dataset',
            poke_interval=60,
            filepath=f'{tsv_tmpdir}/{wiki}_{monthly_snapshot}_header',
            dag=dag,
        )

        # Append header
        append_tsv_header = BashOperator(
            task_id=f'append_{wiki}_tsv_header',
            bash_command=f'cat {tsv_tmpdir}/{wiki}_{monthly_snapshot}_header {tsv_tmpdir}/{wiki}_{monthly_snapshot}/* > {outputdir}/prod-{wiki}-{snapshot}-wd_image_candidates.tsv'
        )

        # Link tasks
        update_imagerec_table >> generate_production_data >> update_imagerec_prod_table
        update_imagerec_prod_table >> export_prod_data
        export_prod_data >> production_dataset_sensor >> append_tsv_header