variable_properties.py 5.71 KB
Newer Older
Mforns's avatar
Mforns committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
This module helps overriding DAG properties dynamically,
altering them without changing the DAG file in version control.
To achieve that, it uses Airflow Variables. Variables can be populated
from Airflow's UI and also from the CLI. And they can be accessed from
Airflow code (i.e. within a DAG file).

=== To set up a variable in Airflow's UI:
 * In the UI, click on Admin > Variables.
 * Add a new record (Key is the variable name, Val is the value).
This way of altering DAG configs might be useful when i.e. re-running
a production DAG with a different start_date.

=== To set up a variable in the CLI:
 * Export an environment variable in the same context where Airflow runs.
 * The env var name should be 'AIRFLOW_VAR_<your_variable_name>'.
This way of altering DAG configs might be useful when testing a DAG while
developing it.

=== Variable format:
For this module to work, the variable value must be a JSON string.
For instance: '{"prop1": "val1", "prop2": {"prop3": "val3"}}'
If the specified variable exists but it's not a JSON string,
the code will raise a ValueError.

=== Add the code to your DAG file:
In your DAG file create a VariableProperties instance with:
    var_props = VariableProperties('<your_variable_name>')
And use it when passing properties to i.e. DAG, Sensors and Operators with:
    var_props.get('<property_name>', '<default_value>')
Where <property_name> is the property key within the Variable's JSON value,
and <default_value> is the value that var_props.get() will return in case
the Variable is not defined, or its JSON does not contain such property.

=== Precedence of property values:
When you ask for a property value with `var_props.get()`,
this module will try and return the value in this precedence order.
 * The value specified via CLI variable (if defined).
 * The value specified via Airflow UI (if defined).
 * The value specified as a default when calling VariableProperties(...).

=== Special property values
If the propery that you want to override is of type datetime, timedelta or dict,
use the following getters instead of var_props.get():

 * var_props.get_datetime(property_name, default_value):
   Expects the corresponding value stored in the Variable JSON to be an ISO8601
   timestamp ("2022-02-01T00"), and will raise a ValueError otherwise. It will
   parse it and return a datetime object. The default value must be a datetime.

 * var_props.get_timedelta(property_name, default_value):
   Expects the corresponding value stored in the Variable JSON to be an ISO8601
   duration ("P7DT4H"), and will raise a ValueError otherwise. It will parse it
   and return a timedelta object. The default value must be a timedelta.

 * var_props.get_merged(property_name, default_value):
   Expects the corresponding value stored in the Variable JSON to be a JSON
   object, and will raise a ValueError otherwise. It will parse it and return
   a dict containing all fields of the default_value plus all fields of the
   Variable JSON, potentially overriding the first with the latter.
   The default value should be a python dict.
"""

Ottomata's avatar
Ottomata committed
64
65
from typing import cast, Any, Callable

Mforns's avatar
Mforns committed
66
67
68
69
70
71
72
from datetime import datetime, timedelta
from isodate import parse_duration
from json import JSONDecodeError
from airflow.models import Variable

class VariableProperties:

Ottomata's avatar
Ottomata committed
73
    def __init__(self, variable_name: str):
Mforns's avatar
Mforns committed
74
        try:
Ottomata's avatar
Ottomata committed
75
            self.variable: Variable = Variable.get(
Mforns's avatar
Mforns committed
76
77
78
79
80
81
82
83
84
                variable_name,
                deserialize_json=True,
                default_var={},
            )
        except JSONDecodeError:
            raise ValueError(
                f'Variable {variable_name} can not be parsed as JSON.'
            )

Ottomata's avatar
Ottomata committed
85
    def get(self, property_name: str, default_value: Any) -> Any:
Mforns's avatar
Mforns committed
86
87
        return self.variable.get(property_name, default_value)

Ottomata's avatar
Ottomata committed
88
    def get_datetime(self, property_name: str, default_value: datetime) -> datetime:
Mforns's avatar
Mforns committed
89
90
        if type(default_value) is not datetime:
            raise ValueError('Default value is not a datetime.')
Ottomata's avatar
Ottomata committed
91
92
93
94
        return cast(
            datetime,
            self.get_parsed(property_name, datetime.fromisoformat, default_value)
        )
Mforns's avatar
Mforns committed
95

Aqu's avatar
Aqu committed
96
97
98
99
100
101
102
103
104
105
106
107
    def get_list(self, property_name: str, default_value: list) -> list:
        if type(default_value) is not list:
            raise ValueError('Default value is not a list.')
        no_parsing = lambda x: x
        result =  cast(
            list,
            self.get_parsed(property_name, no_parsing, default_value)
        )
        if type(result) is not list:
            raise ValueError('Value is not a list.')
        return result

Ottomata's avatar
Ottomata committed
108
    def get_timedelta(self, property_name: str, default_value: timedelta) -> timedelta:
Mforns's avatar
Mforns committed
109
110
        if type(default_value) is not timedelta:
            raise ValueError('Default value is not a timedelta.')
Ottomata's avatar
Ottomata committed
111
112
113
114
115
116
117
118
119
120
121
        return cast(
            timedelta,
            self.get_parsed(property_name, parse_duration, default_value)
        )

    def get_parsed(
        self,
        property_name: str,
        parser: Callable[[Any], Any],
        default_value: Any
    ) -> Any:
Mforns's avatar
Mforns committed
122
123
124
125
126
127
128
        if property_name not in self.variable:
            return default_value
        try:
            return parser(self.variable[property_name])
        except ValueError:
            raise ValueError(f'Property {property_name} can not be parsed.')

Ottomata's avatar
Ottomata committed
129
    def get_merged(self, property_name: str, default_value: dict) -> dict:
Mforns's avatar
Mforns committed
130
131
132
133
134
135
136
137
138
139
140
141
142
        if type(default_value) is not dict:
            raise ValueError('Default value is not a dict.')
        if property_name not in self.variable:
            return default_value
        try:
            return {
                **default_value,
                **self.variable[property_name]
            }
        except TypeError:
            raise TypeError(
                f'Property {property_name} is not a json object.'
            )