Commit 17187136 authored by Gmodena's avatar Gmodena
Browse files

Merge branch 'T293382-dags-testing' into 'multi-project-dags-repo'

Add tests for airflow dags

See merge request !6
parents 33e1232c 5f41906a
Pipeline #1179 failed with stages
include Makefile.conda
branch := $(shell git rev-parse --abbrev-ref HEAD) branch := $(shell git rev-parse --abbrev-ref HEAD)
short_commit_hash := $(shell git rev-parse --short=8 HEAD) short_commit_hash := $(shell git rev-parse --short=8 HEAD)
airflow_host := an-airflow1003.eqiad.wmnet airflow_host := an-airflow1003.eqiad.wmnet
...@@ -25,6 +27,11 @@ ima-venv: ...@@ -25,6 +27,11 @@ ima-venv:
rm -f ${ima_home}/${ima_venv_archive} rm -f ${ima_home}/${ima_venv_archive}
make -C ${ima_home} venv make -C ${ima_home} venv
test_dags: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install -r ${pip_requirements_test}; \
python -m pytest tests/"
test: test:
cd ${ima_home}; make mypy; make test cd ${ima_home}; make mypy; make test
......
SHELL := /bin/bash
venv := venv
venv_archive_format := tar.gz
pip_requirements := requirements.txt
pip_requirements_test := requirements-test.txt
conda_python_version = 3.7
pyspark_version = 2.4.5
extra_pypi := https://gitlab.wikimedia.org/api/v4/projects/40/packages/pypi/simple
CONDA_CMD := conda config --set pip_interop_enabled True; conda create -n ${venv} python=${conda_python_version}; conda init bash; source ~/.bashrc && conda activate ${venv}
ifneq ($(SKIP_DOCKER),true)
CURRENT_DIR := $(shell pwd)
DOCKER_IMG := continuumio/miniconda3
DOCKER_CMD := docker run -it \
--rm \
-v ${CURRENT_DIR}:/root \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
endif
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install --extra-index-url ${extra_pypi} -r ${pip_requirements}; \
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
test: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install -r ${pip_requirements_test}; \
python -m pytest tests/"
SHELL := /bin/bash SHELL := /bin/bash
venv := venv include ../Makefile.conda
venv_archive_format := tar.gz
pip_requirements := requirements.txt
pip_requirements_test := requirements-test.txt
conda_python_version = 3.7
pyspark_version = 2.4.5
extra_pypi := https://gitlab.wikimedia.org/api/v4/projects/40/packages/pypi/simple
CONDA_CMD := conda config --set pip_interop_enabled True; conda create -n ${venv} python=${conda_python_version}; conda init bash; source ~/.bashrc && conda activate ${venv}
ifneq ($(SKIP_DOCKER),true)
DOCKER_IMG := continuumio/miniconda3
DOCKER_CMD := docker run -it \
--rm \
-v /Users/gmodena/repos/gitlab/platform-airflow-dags/image-matching:/root \
-e SKIP_DOCKER=true \
-w /root ${DOCKER_IMG}
endif
venv: ${pip_requirements}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
pip install --extra-index-url ${extra_pypi} -r ${pip_requirements}; \
conda deactivate; \
conda install conda-pack; \
conda-pack -n ${venv} --format ${venv_archive_format}"
mypy: ${pip_requirements_test} mypy: ${pip_requirements_test}
${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \ ${DOCKER_CMD} bash -c "export CONDA_ALWAYS_YES=true; ${CONDA_CMD}; \
......
pytest==6.2.5
apache-airflow==2.2.0
apache-airflow-providers-papermill==2.1.0
"""
Modified from
https://github.com/danielvdende/data-testing-with-airflow/blob/master/integrity_tests/test_dag_integrity.py.
Testing Airflow workflows - ensuring your DAGs work before going into production, https://www.youtube.com/watch?v=ANJnYbLwLjE&t=1184s
DAG integrity test for airflow.
"""
import glob
import importlib.util
from os import path
import pytest
from airflow import models as airflow_models
from airflow.utils.dag_cycle_tester import check_cycle
DAGS_PATH = glob.glob(path.join(path.dirname(__file__), '..', '..', 'dags', '*.py'))
def _exec_module(name: str, location: str):
spec = importlib.util.spec_from_file_location(name, location)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
@pytest.mark.parametrize('dag_path', DAGS_PATH)
def test_dag_integrity(dag_path):
dag_name = path.basename(dag_path)
module = _exec_module(dag_name, str(dag_path))
dag_objects = [var for var in vars(module).values() if isinstance(var, airflow_models.DAG)]
assert dag_objects
# Ensure that all dags are acyclic
for dag in dag_objects:
check_cycle(dag)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment