spark.py 14.9 KB
Newer Older
1
from typing import Any, Dict, Optional, Union
2

3
from airflow.exceptions import AirflowFailException
4
5
from airflow.providers.apache.spark.operators.spark_submit \
    import SparkSubmitOperator as AirflowSparkSubmitOperator
6
7
8
9
10
11

from wmf_airflow_common.hooks.spark import (
    SparkSubmitHook,
    SparkSkeinSubmitHook,
    kwargs_for_virtualenv
)
12
from wmf_airflow_common.util import resolve_kwargs_default_args
13

Mforns's avatar
Mforns committed
14

15

16
class SparkSubmitOperator(AirflowSparkSubmitOperator):
Mforns's avatar
Mforns committed
17
    """
18
19
20
    A SparkSubmitOperator that uses wmf_airflow_common SparkSubmitHook to add
    extra features (like launching via Skein) that are not included with the
    built in Airflow SparkSubmitOperator.
Mforns's avatar
Mforns committed
21
22
    """

Ottomata's avatar
Ottomata committed
23
24
    template_fields = AirflowSparkSubmitOperator.template_fields + (
        '_command_preamble',
25
        '_command_postamble',
Ottomata's avatar
Ottomata committed
26
27
28
        '_driver_java_options',
    )

29
30
31
32
33
    """
    List of available values for the launcher param.
    """
    available_launchers = ('local', 'skein')

Mforns's avatar
Mforns committed
34
35
    def __init__(
        self,
36
        launcher: str = 'local',
37
        avoid_local_execution: bool = False,
38
        command_preamble: Optional[str] = None,
39
        command_postamble: Optional[str] = None,
Aqu's avatar
Aqu committed
40
        driver_cores: Optional[int] = None,
41
        driver_java_options: Optional[str] = None,
Ottomata's avatar
Ottomata committed
42
43
        master: Optional[str] = 'local',
        deploy_mode: Optional[str] = None,
44
45
        queue: Optional[str] = None,
        spark_home: Optional[str] = None,
46
47
48
        skein_master_log_level: Optional[str] = None,
        skein_client_log_level: Optional[str] = None,
        skein_app_log_collection_enabled: bool= True,
49
50

        conn_id: Optional[str] = None,  # overridden here to change default value
51
        name: str = '{{ task_instance_key_str }}',  # overridden here to change default value
Ottomata's avatar
Ottomata committed
52
53
        # overridden here to support dict k,v pairs.
        application_args: Optional[Union[list, dict]] = None,
54
55

        **kwargs: Any,  # Rest of the kwargs for provided Airflow SparkSubmitHook
Mforns's avatar
Mforns committed
56
57
    ):
        """
58
59
        Initializes a SparkSubmitOperator.  For parameter documentation, see
        wmf_airflow_common SparkSubmitHook and the provided Airflow SparkSubmitHook.
60
61
62

        :param launcher:
            launcher to use to run spark-submit command. Only supported
63
64
            launchers are 'local' (default) and 'skein'.
            If 'local', spark-submit will be run locally via our
65
            SparkSubmitHook.
66
67
            If 'skein' which will result in using our SparkSkeinSubmitHook.
            Default: 'local'
68

Ottomata's avatar
Ottomata committed
69
70
71
72
        :param command_preamble:
            Command to prefix before the spark-submit command.
            This only works when launcher=skein, else it is ignored.

73
74
75
76
77
        :param avoid_local_execution:
            If True, this will raise an AirflowException if we would have
            run spark-submit in a way that would cause local Spark work to
            happen. That is, either launcher must be skein, or deploy_mode
            must be yarn cluster.
78
            Default: False.
79

80
81
82
83
84
85
86
87
88
89
90
        :param skein_master_log_level:
            Log level for Skein YARN AppMaster.
            Only used if launcher='skein'.

        :param skein_client_log_level:
            Log level for Skein Client.
            Only used if launcher='skein'.

        :param skein_app_log_collection_enabled:
            If YARN AppMaster logs should be collected and logged locally
            after the app finishes.
91
92
            Only used if launcher='skein'.

93
94
        :param application_args:
            If a dict of k=v pairs, will be converted to a list.
Mforns's avatar
Mforns committed
95
96
        """

97
98
        self._launcher = launcher
        self._command_preamble = command_preamble
99
        self._command_postamble = command_postamble
100
101
102
103
104
105
        self._driver_cores = driver_cores
        self._driver_java_options = driver_java_options
        self._master = master
        self._deploy_mode = deploy_mode
        self._queue = queue
        self._spark_home = spark_home
106
107
108
        self._skein_master_log_level = skein_master_log_level
        self._skein_client_log_level = skein_client_log_level
        self._skein_app_log_collection_enabled = skein_app_log_collection_enabled
109

110
111
112
113
114
115
        if self._launcher not in self.available_launchers:
            raise AirflowFailException(
                f'launcher must be one of {",".join(self.available_launchers)}, '
                f'was: {self._launcher}'
            )

116
117
118
119
120
121
        if avoid_local_execution and self._launcher != 'skein' and self._deploy_mode != 'cluster':
            raise AirflowFailException(
                'Avoiding local Spark execution. '
                'Please use either launcher=\'skein\' or deploy_mode=\'cluster\'.'
            )

122
123
124
125
126
127
128
129
        # Support application_args being provided as a dict.
        # Parent SparkSubmitOperator wants them as a list.
        if isinstance(application_args, dict):
            application_args_list = []
            for key, val in application_args.items():
                application_args_list += [key, val]
            application_args = application_args_list

130
131
132
133
134
        super().__init__(
            # These need to be passed explicitly
            # since we override their default values.
            conn_id=conn_id,
            name=name,
135
            application_args=application_args,
136
137
            # The rest of parent Airflow SparkSubmitOperator kwargs can be passed like this.
            **kwargs
Mforns's avatar
Mforns committed
138
139
        )

140
141
142
143
144
145
146
147
148
    def _get_hook(self) -> SparkSubmitHook:
        """
        Overrides the provided Aiflow SparkSubmitHook
        to use our custom SparkSubmitHook.

        :return: a SparkSubmitHook instance
        :rtype: wmf_airflow_common.hooks.SparkSubmitHook
        """

Ottomata's avatar
Ottomata committed
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
        hook_kwargs = {
            'application': self._application,
            'driver_cores': self._driver_cores,
            'driver_java_options': self._driver_java_options,
            'master': self._master,
            'deploy_mode': self._deploy_mode,
            'queue': self._queue,
            'spark_home': self._spark_home,
            'conf': self._conf,
            'conn_id': self._conn_id,
            'files': self._files,
            'py_files': self._py_files,
            'archives': self._archives,
            'driver_class_path': self._driver_class_path,
            'jars': self._jars,
            'java_class': self._java_class,
            'packages': self._packages,
            'exclude_packages': self._exclude_packages,
            'repositories': self._repositories,
            'total_executor_cores': self._total_executor_cores,
            'executor_cores': self._executor_cores,
            'executor_memory': self._executor_memory,
            'driver_memory': self._driver_memory,
            'keytab': self._keytab,
            'principal': self._principal,
            'proxy_user': self._proxy_user,
            'name': self._name,
            'num_executors': self._num_executors,
            'status_poll_interval': self._status_poll_interval,
            'application_args': self._application_args,
            'env_vars': self._env_vars,
            'verbose': self._verbose,
            'spark_binary': self._spark_binary,
        }

184
        if self._launcher == 'skein':
Ottomata's avatar
Ottomata committed
185
            hook_kwargs.update({
Ottomata's avatar
Ottomata committed
186
                'command_preamble': self._command_preamble,
187
                'command_postamble': self._command_postamble,
188
189
190
                'skein_master_log_level': self._skein_master_log_level,
                'skein_client_log_level': self._skein_client_log_level,
                'skein_app_log_collection_enabled': self._skein_app_log_collection_enabled,
Ottomata's avatar
Ottomata committed
191
192
193
194
            })
            hook = SparkSkeinSubmitHook(**hook_kwargs)
        else:
            hook = SparkSubmitHook(**hook_kwargs)
195

Ottomata's avatar
Ottomata committed
196
        return hook
197

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
    @classmethod
    def for_virtualenv(
        cls,
        virtualenv_archive: str,
        entry_point: str,
        use_virtualenv_python: bool = True,
        use_virtualenv_spark: bool = False,
        **kwargs: Any,
    ) -> 'SparkSubmitOperator':
        """
        Factory method for using a SparkSubmitOperator with a virtualenv.
        This should work with both Conda and regular virtualenvs.

        Example:
        ::
            op = SparkSubmitOperator.for_virtualenv(
                # This will by default be unpacked on the
                # workers as 'venv'.  You can change this by setting
                # an '#my_alias_here' suffix on this archive url.
                virtualenv_archive = 'hdfs:///path/to/my_conda_env.tgz#env_alias',

                # Path to the pyspark job file you want to execute, relative
                # to the archive.  (Note that this must end with '.py' for spark-submit
                # to realize it is a pyspark job.)
                entry_point = 'bin/my_pyspark_job.py'

                # This causes PYSPARK_PYTHON, etc. to be set to <alias>/bin/python
                use_virtualenv_python = True,

                # This causes spark_binary to be set to <alias>/bin/spark-submit.
                # Only set this if you include pyspark as a dependency package in your venv.
                use_virtualenv_spark = True,
            )

        :param virtualenv_archive:
            URL to the archive virtualenv file. If no '#alias' suffix is provided,
            The unpacked alias will be 'venv'.

        :param entry_point:
            This should be a relative path to your spark job file
            inside of the virtualenv_archive.

        :param use_virtualenv_python:
            Whether python in side of the venv should be used.
            defaults to True

        :param use_virtualenv_spark:
            Whether bin/spark-submit inside of the venv should be used.
            defaults to False.
            Note that if you set this to true, a spark_home and spark_binary
            kwargs will be overriden.

        Note that skein will be the default 'launcher' if not explicitly set in kwargs.

        :return: SparkSubmitOperator
        """

        # Default launcher=skein.
        # This will usually be what is needed to use a virtualenv archive,
        # as it is not expected that an unpacked virtualenv archive will be
        # available locally.
        if 'launcher' not in kwargs:
            kwargs['launcher'] = 'skein'

        # NOTE: This uses to SparkSubmitHook.kwargs_for_virtualenv to
        # construct a SparkSubmitOperator for a virtualenv.  This works because
        # the kwargs for SparkSubmitHook are the same as those for SparkSubmitOpeator.
        # This is done so that SparkSubmitHook itself can use this helper as well.
        # This ONLY works here because SparkSubmitHook and SparkSubmitOperator use
        # the same kwarg names, as SparkSubmitOperator is kind of a proxy
        # For SparkSubmitHook.
        spark_submit_kwargs = kwargs_for_virtualenv(
            virtualenv_archive=virtualenv_archive,
            entry_point=entry_point,
            use_virtualenv_python=use_virtualenv_python,
            use_virtualenv_spark=use_virtualenv_spark,
            **kwargs
        )

        return cls(**spark_submit_kwargs)

279
280
281

class SparkSqlOperator(SparkSubmitOperator):
    """
282
    Executes a query in SparkSQL using
283
284
285
286
    org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver (by default).
    Doing this avoids shelling out to spark-sql,
    allowing us to subclass our SparkSubmitOperator.

287
288
289
290
291
    This class attempts to keep the same interface as the built in Airflow
    SparkSqlOperator, even though it does not extend from it.
    It adds a query_parameters param to aide in correctly populating
    application_args with -d k=v query parameters to SparkSQLCLIDriver.

292
293
294
295
296
297
298
299
300
301
    However, Airflow templated .sql/.hql files are not supported.
    WMF tries to keep job code separate from scheduler code, so
    .sql files could be anywhere (perhaps already in HDFS!),
    and Airflow is not able to read the .sql file to render it as
    a template.
    To use Airflow template values in your SQL file, use them in
    query_parameters instead, and make sure your SQL file expects
    query variables to be defined that way, instead of using Airflow templates
    directly in your SQL.

302
303
304
305
306
307
308
    Note that if you change java_class (perhaps to use your own version
    of SparkSQLCLIDriver like we do at WMF), you MUST set the application
    param to the jar containing your class. SparkSQLCLIDriver is a 'special class'
    according to spark-submit; it will allow spark-submit to work without
    an 'application' set, but another implementation won't.
    See: Apache Spark source code SparkSubmitCommandBuilder.java specialClasses.
    """
309

310
311
312
    spark_sql_driver_class = 'org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver'
    """
    Apache Spark class SparkSQLCLIDriver. Default value of java_class if not set.
313
314
    """

315
316
317
318
319
    template_fields = SparkSubmitOperator.template_fields + (
        '_sql',
        '_query_parameters'
    )

320
321
    def __init__(
        self,
322
323
        sql: str,
        query_parameters: Optional[Dict[str, str]] = None,
324
325
        **kwargs: Any,
    ):
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
        """
        :param sql:
            The SQL query to execute, or the path to a .sql or .hql file. (templated)

        :param query_parameters:
            Dict of query parameters to pass in application_args as '-d'. (templated)

        :param **kwargs:
            kwargs to pass to parent SparkSubmitOperator.
            kwargs['application_args'] is modified to use sql and query_parameters,
            so you probably shouldn't set this kwarg.

        """
        self._sql = sql
        self._query_parameters = query_parameters

342
343
344
345
346
347
348
        # Set default_args in kwargs if it doesn't exist to use it later
        if 'default_args' not in kwargs:
            kwargs['default_args'] = {}

        # Only set a default java_class if no default is set
        if 'java_class' not in kwargs['default_args']:
            kwargs['default_args']['java_class'] = self.spark_sql_driver_class
349

350
        # If using a custom java_class, make sure application is set.
351
352
353
        final_java_class = resolve_kwargs_default_args(kwargs, 'java_class')
        final_application = resolve_kwargs_default_args(kwargs, 'application')
        if final_java_class != self.spark_sql_driver_class and final_application is None:
354
355
356
357
            raise AirflowFailException(
                'If using a custom java_class with SparkSqlOperator, '
                'the application parameter must be set.'
            )
358

359
360
        # NOTE: This works since application_args are not set in default_args
        if 'application_args' not in kwargs:
361
362
363
364
365
366
367
368
369
370
371
372
            kwargs['application_args'] = []

        # Pass the sql parameter as either the -f or the -e application_args.
        self._sql = self._sql.strip()
        if self._sql.endswith(".sql") or self._sql.endswith(".hql"):
            kwargs['application_args'] += ["-f", sql]
        else:
            kwargs['application_args'] += ["-e", sql]

        # Pass query_parameters as -d k=v application_args.
        if self._query_parameters:
            for key, val in self._query_parameters.items():
373
374
                # NOTE: does {val} need quoted?
                # This will fail if val has spaces in it.
375
                kwargs['application_args'] += ['-d', f'{key}={val}']
376
377
378
379

        super().__init__(
            **kwargs,
        )