Commit b60c1d72 authored by Ottomata's avatar Ottomata
Browse files

Add a for_virtualenv factory method to SparkSubmitOperator

parent aec4fd17
......@@ -28,10 +28,12 @@ Example:
from my_instance_name.config.dag_config import artifact
with DAG(...) as dag:
t1 = SparkSubmitOperator(archives=artifact('my_proejct_conda_env-0.0.1.tgz'), ...)
t1 = SparkSubmitOperator(archives=artifact('my_artifact-0.0.1.tgz'))
```
`artifact` will look up the actual URL to use for the given artifact id.
`artifact` will look up the actual URL to use for the given artifact name.
## wmf_airflow_common
......@@ -50,6 +52,32 @@ but add additional features like:
- Launching Spark via skein
- Overriding the java_class used for SparkSql.
#### SparkSubmitOperator for conda dist envs
Our `SparkSubmitOperator` has a factory method help construct `SparkSubmitOperator`s
for use with conda dist env archives:
```python
t1 = SparkSubmitOperator.for_virtualenv(
virtualenv_archive = artifact('my-conda-dist-env-0.1.0.tgz'),
entry_point = 'bin/my_spark_job.py',
launcher = 'skein',
# ... Other SparkSubmitOperator constructor keyword args here.
)
```
This is meant to work with conda dist env archives without having them
locally available to the Airflow Scheduler, so setting `launcher='skein'`
and using a `virtualenv_archive` in HDFS is probably what you want to do.
There is experimental support for using your own pyspark dependency
from the conda virtualenv. If you have pyspark installed in your conda
virtualenv, setting `use_virtualenv_spark=True` will cause `SparkSubmitOperator`
to set `spark_binary` to the path to bin/spark-submit in your virtualenv archive.
NOTE: While this works, there are extra configurations that need to be set
to work with [Hadoop](https://spark.apache.org/docs/latest/hadoop-provided.html#apache-hadoop)
and [Hive](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#hive-tables).
### Skein Operators
The `SkeinOperator` and `SimpleSkeinOperator` can be used to launch generic
applications in YARN via Skein.
......
......@@ -7,7 +7,11 @@ from skein import (
FileType as SkeinFileType
)
from airflow.models import Connection
from wmf_airflow_common.hooks.spark import SparkSubmitHook, SparkSkeinSubmitHook
from wmf_airflow_common.hooks.spark import (
SparkSubmitHook,
SparkSkeinSubmitHook,
kwargs_for_virtualenv,
)
@pytest.fixture
......@@ -105,6 +109,7 @@ def test_resolve_connection(spark_submit_hook_kwargs, expected):
'deploy_mode': 'client',
'spark_binary': '/usr/lib/spark/bin/spark-submit',
'application_args': ['--app-arg1', 1], # 1 should be cast to a string.
'env_vars': {'VAR1': 'val1'},
},
[
'/usr/lib/spark/bin/spark-submit',
......@@ -112,8 +117,12 @@ def test_resolve_connection(spark_submit_hook_kwargs, expected):
'-Dmy.prop=fake',
'--driver-cores',
'2',
'--conf',
'spark.executorEnv.VAR1=val1',
'--master',
'yarn',
'--conf',
'spark.yarn.appMasterEnv.VAR1=val1',
'--name',
'fake_name',
'--queue',
......@@ -242,3 +251,71 @@ def test_spark_skein_submit_hook_skein_files(spark_submit_hook_kwargs, expected_
hook = SparkSkeinSubmitHook(**spark_submit_hook_kwargs)
skein_files = hook._skein_hook._application_spec.master.files
assert skein_files == expected_skein_files
@pytest.mark.parametrize(['for_virtualenv_kwargs', 'expected'], [
pytest.param(
{
'virtualenv_archive': 'hdfs:///path/to/conda_env.tgz',
'entry_point': 'bin/my_pyspark_job.py',
'use_virtualenv_spark': True,
'archives': 'hdfs:///my_extra_archive.tgz#my_extra_archive',
'driver_cores': '2',
'driver_java_options': '-Dmy.prop=fake',
'master': 'yarn',
'queue': 'fake-queue',
'name': 'fake_name',
'deploy_mode': 'client',
'keytab': '/path/to/me.keytab',
'principal': 'me@ORG',
},
[
'venv/bin/spark-submit',
'--driver-java-options',
'-Dmy.prop=fake',
'--driver-cores',
'2',
'--conf',
'spark.executorEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python',
'--conf',
'spark.executorEnv.PYSPARK_PYTHON=venv/bin/python',
'--master',
'yarn',
'--conf',
'spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv/bin/python',
'--conf',
'spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python',
'--archives',
'hdfs:///path/to/conda_env.tgz#venv,hdfs:///my_extra_archive.tgz#my_extra_archive',
'--keytab',
'/path/to/me.keytab',
'--principal',
'me@ORG',
'--name',
'fake_name',
'--queue',
'fake-queue',
'--deploy-mode',
'client',
'venv/bin/my_pyspark_job.py',
],
id="spark_submit_hook_using_kwargs_for_virtualenv"
),
])
def test_spark_submit_hook_kwargs_for_virtualenv(for_virtualenv_kwargs, expected):
hook = SparkSubmitHook(
**kwargs_for_virtualenv(
**for_virtualenv_kwargs
)
)
assert hook._build_spark_submit_command() == expected
def test_spark_submit_hook_kwargs_for_virtualenv_both_application_and_entry_point():
with pytest.raises(ValueError):
kwargs_for_virtualenv(
virtualenv_archive='hdfs:///path/to/conda_env.tgz',
entry_point='bin/my_pyspark_job.py',
application='/my/application/py',
)
......@@ -40,6 +40,13 @@ def test_is_wmf_airflow_instance_with_env_var(monkeypatch):
monkeypatch.setenv('AIRFLOW_INSTANCE_NAME', 'analytics-test')
assert util.is_wmf_airflow_instance() is True
def test_is_relative_uri():
assert util.is_relative_uri('rel1')
assert not util.is_relative_uri('/rel1')
assert not util.is_relative_uri('file:///rel1')
def test_helpers_in_dev_environment(monkeypatch):
# Make sure AIRFLOW_INSTANCE_NAME is set so we get wmf env variants.
monkeypatch.setenv('AIRFLOW_INSTANCE_NAME', 'airflow-development-analytics-aqu')
......@@ -47,8 +54,10 @@ def test_helpers_in_dev_environment(monkeypatch):
assert util.is_wmf_airflow_instance() is False
assert util.airflow_environment_name() == 'dev_wmf'
def test_resolve_kwargs_default_args(monkeypatch):
assert util.resolve_kwargs_default_args({}, 'k') is None
assert util.resolve_kwargs_default_args({'k': 'v'}, 'k') == 'v'
assert util.resolve_kwargs_default_args({'default_args': {'k': 'v'}}, 'k') == 'v'
assert util.resolve_kwargs_default_args({'k': 'v', 'default_args': {'k': 'v2'}}, 'k') == 'v'
from typing import List, Optional
from typing import List, Optional, Union
from os import path
from workflow_utils.artifact import Artifact
from wmf_airflow_common import config as wmf_airflow_common_config
......@@ -32,8 +32,10 @@ class ArtifactRegistry:
def artifact_url(self, artifact_name: str) -> str:
"""
Gets the first expected cached url for the artifact id.
Gets the first expected cached url for the artifact name.
The artifact is not checked for existence at this url.
:param artifact_name:
"""
return str(self.artifact(artifact_name).cached_url())
......
......@@ -88,6 +88,7 @@ class SkeinHook(BaseHook):
This is meant to be single use. Once submit() is called,
this cannot be used again.
"""
self.log.info('Constructing skein Client with kwargs: %s', self._client_kwargs)
return skein.Client(**self._client_kwargs)
@cached_property
......
......@@ -6,7 +6,6 @@ from airflow.providers.apache.spark.hooks.spark_submit \
from wmf_airflow_common.hooks.skein import \
SkeinHookBuilder, parse_file_source
class SparkSubmitHook(AirflowSparkSubmitHook):
"""
Wraps the apache.spark SparkSubmitHook provided with Airflow to accomplish the following:
......@@ -128,7 +127,8 @@ class SparkSubmitHook(AirflowSparkSubmitHook):
def _build_spark_submit_command(self, application: Optional[str] = None) -> List[str]:
"""
Overrides Airflow SparkSubmitHook to support some extra options, including
application, driver_cores, driver_java_options.
application, driver_cores, driver_java_options. This also
extends env_vars support to have them set in Spark executors as well as driver.
:param application:
Only used for compatibility with parent method. Will be used if set,
......@@ -141,13 +141,20 @@ class SparkSubmitHook(AirflowSparkSubmitHook):
application = application or self._application
connection_cmd: List[str] = super()._build_spark_submit_command(application)
# If _env_vars, then make sure these are set in executor as well as
# driver. (Parent Airflow SparkSubmitHook handles setting them for driver).
if self._env_vars and (self._is_kubernetes or self._is_yarn):
for key, val in self._env_vars.items():
connection_cmd.insert(1, f'spark.executorEnv.{key}={val}')
connection_cmd.insert(1, '--conf')
if self._driver_cores:
connection_cmd.insert(1, self._driver_cores)
connection_cmd.insert(1, "--driver-cores")
connection_cmd.insert(1, '--driver-cores')
if self._driver_java_options:
connection_cmd.insert(1, self._driver_java_options)
connection_cmd.insert(1, "--driver-java-options")
connection_cmd.insert(1, '--driver-java-options')
self.log.info('%s spark-submit final cmd: %s', self, self._mask_cmd(connection_cmd))
return connection_cmd
......@@ -174,6 +181,7 @@ class SparkSubmitHook(AirflowSparkSubmitHook):
return desc
class SparkSkeinSubmitHook(SparkSubmitHook):
"""
Overrides our SparkSubmitHook too:
......@@ -227,6 +235,16 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
"""
super().__init__(**kwargs)
if self._keytab is None:
self.log.warning(
'%s launching via skein without a keytab. '
'If the Spark job needs to interact with services that require '
'Kerberos authentication (HDFS, Hive, etc.), the job will fail '
'because it cannot authenticate with Kerberos from the YARN '
'application master without a keytab.',
self
)
self._command_preamble = command_preamble
skein_hook_builder = SkeinHookBuilder()
......@@ -426,3 +444,106 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
'memory': memory,
'vcores': vcores,
}
def kwargs_for_virtualenv(
virtualenv_archive: str,
entry_point: Optional[str] = None,
application: Optional[str] = None,
use_virtualenv_python: bool = True,
use_virtualenv_spark: bool = False,
**kwargs: Any,
) -> dict:
"""
Helper method to construct SparkSubmit kwargs for use with a virtualenv.
This should work with both Conda and regular virtualenvs.
One of entry_point or application must be set.
:param virtualenv_archive:
URL to the archive virtualenv file. If no '#alias' suffix is provided,
The unpacked alias will be 'venv'.
:param entry_point:
This should be a relative path to your spark job file
inside of the virtualenv_archive.
If you set this, you MUST NOT set application.
:param application:
SparkSubmitHook application. If you set this, you MUST NOT
set entry_point.
:param use_virtualenv_python:
Whether python in side of the venv should be used.
defaults to True
:param use_virtualenv_spark:
Whether bin/spark-submit inside of the venv should be used.
defaults to False.
Note that if you set this to true, a spark_home and spark_binary
kwargs will be overriden.
:raises ValueError:
- If virtualenv_archive is not a .zip, .tar.gz or .tgz file.
- If both entry_point and application are provided.
:return: dict of kwargs
"""
ext_found = False
for ext in ('.zip', '.tar.gz', '.tgz'):
if virtualenv_archive.endswith(ext) or f'{ext}#' in virtualenv_archive:
ext_found = True
break
if not ext_found:
raise ValueError(
f'virtualenv_archive must be a .zip, .tar.gz or .tgz file. Was {virtualenv_archive}'
)
# Set archives.
if '#' in virtualenv_archive:
alias = virtualenv_archive.rsplit('#', 1)[1]
else:
alias = 'venv'
virtualenv_archive += '#' + alias
if 'archives' in kwargs:
kwargs['archives'] = virtualenv_archive + ',' + kwargs['archives']
else:
kwargs['archives'] = virtualenv_archive
# Set kwargs[application].
if (entry_point and application) or (not entry_point and not application):
raise ValueError('Must set only one of entry_point or application.')
if entry_point:
# Set application to the entry_point path in the unpacked virtualenv.
kwargs['application'] = os.path.join(alias, entry_point)
else:
kwargs['application'] = application
# Set PYSPARK_PYTHON* vars.
env_vars = {}
if use_virtualenv_python:
python_exec = os.path.join(alias, 'bin', 'python')
env_vars['PYSPARK_PYTHON'] = python_exec
# Setting PYSPARK_DRIVER_PYTHON will only work if
# python exec is available where spark-submimt is being
# called from. This will work well if launcher == 'skein'
# because we unpack the archive on the skein app master
# and call spark-submit from there. If launcher != skein
# and deploy_mode != cluster, this requires that python_exec
# is available locally to the Airflow Scheduler.
if kwargs.get('deploy_mode') != 'cluster':
env_vars['PYSPARK_DRIVER_PYTHON'] = python_exec
env_vars.update(kwargs.get('env_vars', {}))
kwargs['env_vars'] = env_vars
# Set spark_binary
if use_virtualenv_spark:
kwargs['spark_binary'] = os.path.join(alias, 'bin', 'spark-submit')
# Make sure spark_home is not set, since we will be using spark_binary relative path.
kwargs['spark_home'] = None
return kwargs
......@@ -3,10 +3,16 @@ from typing import Any, Dict, Optional, Union
from airflow.exceptions import AirflowFailException
from airflow.providers.apache.spark.operators.spark_submit \
import SparkSubmitOperator as AirflowSparkSubmitOperator
from wmf_airflow_common.hooks.spark import SparkSubmitHook, SparkSkeinSubmitHook
from wmf_airflow_common.hooks.spark import (
SparkSubmitHook,
SparkSkeinSubmitHook,
kwargs_for_virtualenv
)
from wmf_airflow_common.util import resolve_kwargs_default_args
class SparkSubmitOperator(AirflowSparkSubmitOperator):
"""
A SparkSubmitOperator that uses wmf_airflow_common SparkSubmitHook to add
......@@ -19,9 +25,14 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
'_driver_java_options',
)
"""
List of available values for the launcher param.
"""
available_launchers = ('local', 'skein')
def __init__(
self,
launcher: Optional[str] = None,
launcher: str = 'local',
avoid_local_execution: bool = False,
command_preamble: Optional[str] = None,
driver_cores: Optional[int] = None,
......@@ -47,10 +58,11 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
:param launcher:
launcher to use to run spark-submit command. Only supported
launcher is 'skein' which will result in using our SparkSkeinSubmnitHook.
If launcher is anything else, spark-submit will be run locally via our
launchers are 'local' (default) and 'skein'.
If 'local', spark-submit will be run locally via our
SparkSubmitHook.
Default: None
If 'skein' which will result in using our SparkSkeinSubmitHook.
Default: 'local'
:param command_preamble:
Command to prefix before the spark-submit command.
......@@ -92,6 +104,12 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
self._skein_client_log_level = skein_client_log_level
self._skein_app_log_collection_enabled = skein_app_log_collection_enabled
if self._launcher not in self.available_launchers:
raise AirflowFailException(
f'launcher must be one of {",".join(self.available_launchers)}, '
f'was: {self._launcher}'
)
if avoid_local_execution and self._launcher != 'skein' and self._deploy_mode != 'cluster':
raise AirflowFailException(
'Avoiding local Spark execution. '
......@@ -173,6 +191,87 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
return hook
@classmethod
def for_virtualenv(
cls,
virtualenv_archive: str,
entry_point: str,
use_virtualenv_python: bool = True,
use_virtualenv_spark: bool = False,
**kwargs: Any,
) -> 'SparkSubmitOperator':
"""
Factory method for using a SparkSubmitOperator with a virtualenv.
This should work with both Conda and regular virtualenvs.
Example:
::
op = SparkSubmitOperator.for_virtualenv(
# This will by default be unpacked on the
# workers as 'venv'. You can change this by setting
# an '#my_alias_here' suffix on this archive url.
virtualenv_archive = 'hdfs:///path/to/my_conda_env.tgz#env_alias',
# Path to the pyspark job file you want to execute, relative
# to the archive. (Note that this must end with '.py' for spark-submit
# to realize it is a pyspark job.)
entry_point = 'bin/my_pyspark_job.py'
# This causes PYSPARK_PYTHON, etc. to be set to <alias>/bin/python
use_virtualenv_python = True,
# This causes spark_binary to be set to <alias>/bin/spark-submit.
# Only set this if you include pyspark as a dependency package in your venv.
use_virtualenv_spark = True,
)
:param virtualenv_archive:
URL to the archive virtualenv file. If no '#alias' suffix is provided,
The unpacked alias will be 'venv'.
:param entry_point:
This should be a relative path to your spark job file
inside of the virtualenv_archive.
:param use_virtualenv_python:
Whether python in side of the venv should be used.
defaults to True
:param use_virtualenv_spark:
Whether bin/spark-submit inside of the venv should be used.
defaults to False.
Note that if you set this to true, a spark_home and spark_binary
kwargs will be overriden.
Note that skein will be the default 'launcher' if not explicitly set in kwargs.
:return: SparkSubmitOperator
"""
# Default launcher=skein.
# This will usually be what is needed to use a virtualenv archive,
# as it is not expected that an unpacked virtualenv archive will be
# available locally.
if 'launcher' not in kwargs:
kwargs['launcher'] = 'skein'
# NOTE: This uses to SparkSubmitHook.kwargs_for_virtualenv to
# construct a SparkSubmitOperator for a virtualenv. This works because
# the kwargs for SparkSubmitHook are the same as those for SparkSubmitOpeator.
# This is done so that SparkSubmitHook itself can use this helper as well.
# This ONLY works here because SparkSubmitHook and SparkSubmitOperator use
# the same kwarg names, as SparkSubmitOperator is kind of a proxy
# For SparkSubmitHook.
spark_submit_kwargs = kwargs_for_virtualenv(
virtualenv_archive=virtualenv_archive,
entry_point=entry_point,
use_virtualenv_python=use_virtualenv_python,
use_virtualenv_spark=use_virtualenv_spark,
**kwargs
)
return cls(**spark_submit_kwargs)
class SparkSqlOperator(SparkSubmitOperator):
"""
......
from typing import Optional, Any
import os
from urllib.parse import urlparse
from pathlib import Path
import airflow
from airflow.exceptions import AirflowConfigException
from pyarrow.fs import HadoopFileSystem
......@@ -69,9 +72,21 @@ def is_wmf_airflow_instance() -> bool:
return env_name is not None and env_name.startswith('wmf')
def is_relative_uri(uri: str) -> bool:
"""
Returns false if the url starts with a / or a protocol scheme,
else true.
:param uri: URI to check for relativeness.
:return: True if the uri is relative
"""
return not (Path(uri).is_absolute() or bool(urlparse(uri).scheme))
def resolve_kwargs_default_args(kwargs: dict, key: str) -> Any:
"""
Returns the value found in kwargs for key if it exists, or the value
found in kwargs['default_args'], or None.
"""
return kwargs.get(key, kwargs.get('default_args', {}).get(key))
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment