test_dag_integrity.py 1.13 KB
Newer Older
Gmodena's avatar
Gmodena committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
Modified from
https://github.com/danielvdende/data-testing-with-airflow/blob/master/integrity_tests/test_dag_integrity.py.
Testing Airflow workflows - ensuring your DAGs work before going into production, https://www.youtube.com/watch?v=ANJnYbLwLjE&t=1184s

DAG integrity test for airflow.
"""
import glob
import importlib.util
from os import path

import pytest
from airflow import models as airflow_models
from airflow.utils.dag_cycle_tester import check_cycle

DAGS_PATH = glob.glob(path.join(path.dirname(__file__), '..', '..', 'dags', '*.py'))


def _exec_module(name: str, location: str):
    spec = importlib.util.spec_from_file_location(name, location)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)

    return module


@pytest.mark.parametrize('dag_path', DAGS_PATH)
def test_dag_integrity(dag_path):
    dag_name = path.basename(dag_path)
    module = _exec_module(dag_name, str(dag_path))
    dag_objects = [var for var in vars(module).values() if isinstance(var, airflow_models.DAG)]
    assert dag_objects

    # Ensure  that all dags are acyclic
    for dag in dag_objects:
        check_cycle(dag)