Commit de8615bc authored by Fabian Kaelin's avatar Fabian Kaelin Committed by Mforns
Browse files

Airflow development improvements

parent b9be20d2
......@@ -57,6 +57,7 @@ print_usage () {
echo " Default: /usr/lib/airflow"
echo " -n DEV_ENV_NAME Name of the development Conda environment."
echo " Default: airflow_development"
echo " -i Install wmf_airflow_common into conda env"
}
# Helper: Clean up Airflow services and conda environment.
......@@ -106,7 +107,7 @@ assert () {
script_dir="$(cd -- "$(dirname "${0}")" > "/dev/null" 2>&1 ; pwd -P)"
# Parse optional arguments.
while getopts "hm:a:p:b:n:" options; do
while getopts "hm:a:p:b:n:i" options; do
case "${options}" in
h)
print_usage
......@@ -142,6 +143,9 @@ while getopts "hm:a:p:b:n:" options; do
error "DEV_ENV_NAME ${dev_env_name} is invalid."
fi
;;
i)
install_wmf_airflow_common=true
;;
*)
print_usage
error "Invalid arguments."
......@@ -172,6 +176,9 @@ fi
if [ -z "${dev_env_name}" ]; then
dev_env_name="airflow_development"
fi
if [ -z "${install_wmf_airflow_common}" ]; then
install_wmf_airflow_common=false
fi
# Parse mandatory arguments.
shift $((OPTIND - 1)) # Shift away optional arguments.
......@@ -236,17 +243,29 @@ conda_execs_script="/usr/lib/airflow/etc/profile.d/conda.sh"
source "$conda_execs_script"
assert "Can not add Conda executables to PATH ($conda_execs_script failed)."
webproxy="http://webproxy.eqiad.wmnet:8080"
# Set http(s)_proxy to allow for package downloads.
export http_proxy="${webproxy}"
export https_proxy="${webproxy}"
if ! test -d "${conda_home}/envs/${dev_env_name}"; then
# Set http(s)_proxy to allow for package downloads.
export http_proxy="${webproxy}"
export https_proxy="${webproxy}"
conda create --clone "${base_env_path}" --name "${dev_env_name}" > "${airflow_home}/conda.log" 2>&1
assert "Can not create Conda environment ${dev_env_name} using ${base_env_path}."
unset http_proxy
unset https_proxy
fi
conda activate "${dev_env_name}" >> "${airflow_home}/conda.log" 2>&1
assert "Can not activate Conda environment ${dev_env_name}."
if [ "${install_wmf_airflow_common}" = true ] ; then
echo " Installing wmf_airflow_dags into conda ${conda_home}/envs/${dev_env_name}..."
cd "${script_dir}"
pip install .
assert "Can not install ${script_dir} in conda environment ${dev_env_name}."
cd "${airflow_home}"
fi
unset http_proxy
unset https_proxy
# Set the following env vars to pick the "wmf" defaults for the dag config.
export AIRFLOW_ENVIRONMENT_NAME="dev_wmf"
export AIRFLOW_INSTANCE_NAME="${dev_env_name}_${airflow_project}_$(whoami)"
......
......@@ -176,6 +176,7 @@ def test_build_spark_submit_command(spark_submit_hook_kwargs, expected):
pytest.param(
{
'command_preamble': 'export VAR=VAL1; ',
'command_postamble': ' || sleep 100',
'application': 'fake_app.jar',
'master': 'yarn',
'name': 'fake_name',
......@@ -203,6 +204,7 @@ def test_build_spark_submit_command(spark_submit_hook_kwargs, expected):
'--deploy-mode',
'client',
'fake_app.jar',
' || sleep 100',
],
id="local files paths are modified but remote ones are not, with command_preamble",
),
......
......@@ -201,6 +201,7 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
def __init__(
self,
command_preamble: Optional[str] = None,
command_postamble: Optional[str] = None,
skein_master_log_level: str = 'INFO',
skein_client_log_level: str = 'INFO',
skein_app_log_collection_enabled: bool = True,
......@@ -246,6 +247,8 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
)
self._command_preamble = command_preamble
self._command_postamble = command_postamble
skein_hook_builder = SkeinHookBuilder()
......@@ -386,11 +389,9 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
# Finally build the script based on the current SparkSubmitHook properties.
spark_submit_command = ' '.join(self._build_spark_submit_command())
# Prepend command_premable if it is set.
if self._command_preamble:
script = self._command_preamble + ' ' + spark_submit_command
else:
script = spark_submit_command
command_preamble = f'{self._command_preamble} ' if self._command_preamble else ''
command_postamble = f' {self._command_postamble}' if self._command_postamble else ''
script = f"{command_preamble}{spark_submit_command}{command_postamble}"
skein_hook_builder.script(script)
......@@ -418,7 +419,7 @@ class SparkSkeinSubmitHook(SparkSubmitHook):
def _skein_resources(
self,
default_memory: Union[int, str] = 1024,
default_memory: Union[int, str] = 4096,
default_vcores: Union[int, str] = 1
) -> dict:
"""
......
......@@ -22,6 +22,7 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
template_fields = AirflowSparkSubmitOperator.template_fields + (
'_command_preamble',
'_command_postamble',
'_driver_java_options',
)
......@@ -35,6 +36,7 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
launcher: str = 'local',
avoid_local_execution: bool = False,
command_preamble: Optional[str] = None,
command_postamble: Optional[str] = None,
driver_cores: Optional[int] = None,
driver_java_options: Optional[str] = None,
master: Optional[str] = 'local',
......@@ -94,6 +96,7 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
self._launcher = launcher
self._command_preamble = command_preamble
self._command_postamble = command_postamble
self._driver_cores = driver_cores
self._driver_java_options = driver_java_options
self._master = master
......@@ -181,6 +184,7 @@ class SparkSubmitOperator(AirflowSparkSubmitOperator):
if self._launcher == 'skein':
hook_kwargs.update({
'command_preamble': self._command_preamble,
'command_postamble': self._command_postamble,
'skein_master_log_level': self._skein_master_log_level,
'skein_client_log_level': self._skein_client_log_level,
'skein_app_log_collection_enabled': self._skein_app_log_collection_enabled,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment