Commit a36dc95c authored by Aqu's avatar Aqu
Browse files

Use spark 3 bin & conf provided by deb package

parent 4c973d64
......@@ -43,12 +43,6 @@ artifacts:
id: org.wikimedia.analytics.refinery.job:refinery-job:jar:shaded:0.2.3
source: wmf_archiva_releases
# Used to override the default spark3-NoCLIDriver
# Should be removed once removed from the cassandra jobs
refinery-job-0.2.7-shaded.jar:
id: org.wikimedia.analytics.refinery.job:refinery-job:jar:shaded:0.2.7
source: wmf_archiva_releases
datahub-cli-0.8.38.tgz:
id: datahub:cli:tgz:0.8.38
source: wmf_archiva_python
......
from wmf_airflow_common.config import dag_default_args
from wmf_airflow_common.config import experimental_spark_3_dag_default_args
from wmf_airflow_common.artifact import ArtifactRegistry
from wmf_airflow_common.util import is_wmf_airflow_instance
......@@ -42,5 +41,4 @@ artifact_registry = ArtifactRegistry.for_wmf_airflow_instance('analytics')
artifact = artifact_registry.artifact_url
# Default arguments for all operators used by this airflow instance.
default_args = dag_default_args.get(instance_default_args)
spark3_default_args = experimental_spark_3_dag_default_args.get(instance_default_args, artifact_registry)
default_args = dag_default_args.get(instance_default_args)
\ No newline at end of file
......@@ -18,7 +18,7 @@ dag = AnomalyDetectionDAG(
metric_source=metric_source,
doc_md=__doc__,
default_args=var_props.get_merged('default_args',
{ **dag_config.spark3_default_args,
{ **dag_config.default_args,
'sla': timedelta(hours=6),
}
),
......
......@@ -24,7 +24,7 @@ dag = AnomalyDetectionDAG(
metric_source=metric_source,
doc_md=__doc__,
default_args=var_props.get_merged('default_args',
{ **dag_config.spark3_default_args,
{ **dag_config.default_args,
'sla': timedelta(hours=6),
}
),
......
......@@ -18,7 +18,7 @@ dag = AnomalyDetectionDAG(
metric_source=metric_source,
doc_md=__doc__,
default_args=var_props.get_merged('default_args',
{ **dag_config.spark3_default_args,
{ **dag_config.default_args,
'sla': timedelta(hours=10),
}
),
......
......@@ -11,7 +11,7 @@ from airflow import DAG
from wmf_airflow_common.config.variable_properties import VariableProperties
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
from wmf_airflow_common.operators.spark import SparkSubmitOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, artifact
from analytics.config.dag_config import default_args, hql_directory, artifact
dag_id = 'apis_metrics_to_graphite_hourly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -27,7 +27,7 @@ with DAG(
start_date=var_props.get_datetime('start_date', datetime(2022, 6, 14, 8)),
schedule_interval='@hourly',
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
# We use 6 hours as webrequest data needs to be present for the job to process and webrequest-sla is 5 hours
'sla': timedelta(hours=6),
}
......
......@@ -27,7 +27,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHive
from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.partitions_builder import partition_names_by_granularity
from analytics.config.dag_config import spark3_default_args, hql_directory
from analytics.config.dag_config import default_args, hql_directory
dag_id = 'aqs_hourly'
source_table = 'wmf.webrequest'
......@@ -41,7 +41,7 @@ with DAG(
schedule_interval='@hourly',
tags=['spark', 'hql', 'hive'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(hours=6),
}
),
......
......@@ -18,7 +18,7 @@ from wmf_airflow_common.config.variable_properties import VariableProperties
from airflow.providers.apache.hive.sensors.named_hive_partition import (
NamedHivePartitionSensor)
from wmf_airflow_common.partitions_builder import daily_partitions
from analytics.config.dag_config import spark3_default_args, hql_directory
from analytics.config.dag_config import default_args, hql_directory
# Define Constants
......@@ -37,7 +37,7 @@ with DAG(
schedule_interval='@daily',
tags=['spark', 'hql', 'hive'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(hours=6),
}
),
......
......@@ -22,7 +22,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import (
NamedHivePartitionSensor)
from wmf_airflow_common.partitions_builder import daily_partitions
from wmf_airflow_common.operators.spark import SparkSqlOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, cassandra_default_conf, artifact
from analytics.config.dag_config import default_args, hql_directory, cassandra_default_conf, artifact
dag_id = 'cassandra_daily_load'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -130,7 +130,7 @@ with DAG(
tags=['cassandra', 'hql'],
default_args=var_props.get_merged('default_args',
{
**spark3_default_args,
**default_args,
'sla': timedelta(hours=10),
}
)
......@@ -150,12 +150,6 @@ with DAG(
etl = SparkSqlOperator(
task_id=f'load_{job_name}_to_cassandra',
# Override application jar (currently set to use SparkSQLNoCLIDriver).
# The one set in default_args contains a cassandra-connector that prevents the job
# from using the patched one provided in jars (classpath order issue).
# This parameter should be removed when we move to spark3-on-skein in client mode
# instead of SparkSQLNoCLIDriver in cluster mode.
application=var_props.get('refinery_job_jar', artifact('refinery-job-0.2.7-shaded.jar')),
sql=properties['hql_path'],
query_parameters={
**properties['hql_parameters'],
......
......@@ -15,7 +15,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import (
NamedHivePartitionSensor)
from wmf_airflow_common.partitions_builder import partition_names_by_granularity
from wmf_airflow_common.operators.spark import SparkSqlOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, cassandra_default_conf, artifact
from analytics.config.dag_config import default_args, hql_directory, cassandra_default_conf, artifact
dag_id = 'cassandra_hourly_load'
......@@ -49,7 +49,7 @@ with DAG(
tags=['cassandra', 'hql'],
default_args=var_props.get_merged('default_args',
{
**spark3_default_args,
**default_args,
'sla': timedelta(hours=6),
}
)
......@@ -68,12 +68,6 @@ with DAG(
etl = SparkSqlOperator(
task_id=f'load_{job_name}_hourly_to_cassandra',
# Override application jar (currently set to use SparkSQLNoCLIDriver).
# The one set in default_args contains a cassandra-connector that prevents the job
# from using the patched one provided in jars (classpath order issue).
# This parameter should be removed when we move to spark3-on-skein in client mode
# instead of SparkSQLNoCLIDriver in cluster mode.
application=var_props.get('refinery_job_jar', artifact('refinery-job-0.2.7-shaded.jar')),
sql=properties['hql_path'],
query_parameters={
**properties['hql_parameters'],
......
......@@ -20,7 +20,7 @@ from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.sensors.hive import RangeHivePartitionSensor
from wmf_airflow_common.templates.time_filters import filters
from wmf_airflow_common.operators.spark import SparkSqlOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, cassandra_default_conf, artifact
from analytics.config.dag_config import default_args, hql_directory, cassandra_default_conf, artifact
dag_id = 'cassandra_monthly_load'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -118,7 +118,7 @@ with DAG(
user_defined_filters=filters,
default_args=var_props.get_merged('default_args',
{
**spark3_default_args,
**default_args,
'sla': timedelta(days=2),
}
)
......@@ -137,12 +137,6 @@ with DAG(
etl = SparkSqlOperator(
task_id=f'load_{job_name}_to_cassandra',
# Override application jar (currently set to use SparkSQLNoCLIDriver).
# The one set in default_args contains a cassandra-connector that prevents the job
# from using the patched one provided in jars (classpath order issue).
# This parameter should be removed when we move to spark3-on-skein in client mode
# instead of SparkSQLNoCLIDriver in cluster mode.
application=var_props.get('refinery_job_jar', artifact('refinery-job-0.2.7-shaded.jar')),
sql=properties['hql_path'],
query_parameters={
**properties['hql_parameters'],
......
......@@ -35,7 +35,7 @@ from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
from analytics.config.dag_config import spark3_default_args, artifact
from analytics.config.dag_config import default_args, artifact
from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator
from wmf_airflow_common.operators.spark import SparkSubmitOperator
......@@ -57,7 +57,7 @@ clickstream_archive_base_path = var_props.get(
default_args = var_props.get_merged(
'default_args',
{
**spark3_default_args,
**default_args,
'poke_interval': timedelta(hours=2).total_seconds(),
'sla': timedelta(days=2),
}
......
......@@ -22,7 +22,7 @@ from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSubmitOperator
from wmf_airflow_common.sensors.url import URLSensor
from wmf_airflow_common.templates.time_filters import filters
from analytics.config.dag_config import hadoop_name_node, spark3_default_args, artifact
from analytics.config.dag_config import hadoop_name_node, default_args, artifact
dag_id = 'commons_structured_data_dump_to_hive_weekly'
dump_date = '{{ execution_date | add_days(7) | to_ds_nodash }}'
......@@ -36,7 +36,7 @@ with DAG(
schedule_interval='0 0 * * 1', # Weekly, starting on Monday at 00:00.
tags=['spark', 'dump', 'hive', 'structured_data'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=7),
}
),
......
......@@ -22,7 +22,7 @@ from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSubmitOperator
from wmf_airflow_common.sensors.url import URLSensor
from wmf_airflow_common.templates.time_filters import filters
from analytics.config.dag_config import hadoop_name_node, spark3_default_args, artifact
from analytics.config.dag_config import hadoop_name_node, default_args, artifact
dag_id = 'wikidata_dump_to_hive_weekly'
dump_date = '{{ execution_date | add_days(7) | to_ds_nodash }}'
......@@ -36,7 +36,7 @@ with DAG(
schedule_interval='0 0 * * 1', # Weekly, starting on Monday at 00:00.
tags=['spark', 'dump', 'hive', 'wikidata'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=7),
}
),
......
......@@ -11,7 +11,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHive
from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.sensors.url import URLSensor
from analytics.config.dag_config import spark3_default_args, hadoop_name_node, hql_directory, artifact
from analytics.config.dag_config import default_args, hadoop_name_node, hql_directory, artifact
dag_id = 'editors_daily_monthly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -24,7 +24,7 @@ with DAG(
schedule_interval='@monthly',
tags=['spark', 'hql', 'hive', 'geoeditors'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=10),
}
),
......
......@@ -10,7 +10,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHive
from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.operators.url import URLTouchOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, hadoop_name_node
from analytics.config.dag_config import default_args, hql_directory, hadoop_name_node
dag_id = 'geoeditors_edits_monthly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -23,7 +23,7 @@ with DAG(
schedule_interval='@monthly',
tags=['spark', 'hql', 'hive', 'geoeditors'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=10),
}
),
......
......@@ -10,7 +10,7 @@ from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHive
from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.operators.url import URLTouchOperator
from analytics.config.dag_config import spark3_default_args, hql_directory, hadoop_name_node
from analytics.config.dag_config import default_args, hql_directory, hadoop_name_node
dag_id = 'geoeditors_monthly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -23,7 +23,7 @@ with DAG(
schedule_interval='@monthly',
tags=['spark', 'hql', 'hive', 'geoeditors'],
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=10),
}
),
......
......@@ -31,7 +31,7 @@ from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.templates.time_filters import filters
from analytics.config.dag_config import spark3_default_args, hql_directory, hadoop_name_node
from analytics.config.dag_config import default_args, hql_directory, hadoop_name_node
dag_id = 'geoeditors_public_monthly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -45,7 +45,7 @@ with DAG(
tags=['archive', 'geoeditors', 'monthly', 'public'],
user_defined_filters=filters,
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=10),
}
),
......
......@@ -14,7 +14,7 @@ from wmf_airflow_common.operators.hdfs import HDFSArchiveOperator
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.templates.time_filters import filters
from analytics.config.dag_config import spark3_default_args, hql_directory, hadoop_name_node
from analytics.config.dag_config import default_args, hql_directory, hadoop_name_node
dag_id = 'geoeditors_yearly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -28,7 +28,7 @@ with DAG(
tags=['archive', 'geoeditors', 'yearly'],
user_defined_filters=filters,
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=5),
}
),
......
......@@ -13,7 +13,7 @@ from wmf_airflow_common.config.variable_properties import VariableProperties
from wmf_airflow_common.operators.spark import SparkSqlOperator
from wmf_airflow_common.templates.time_filters import filters
from analytics.config.dag_config import spark3_default_args, hql_directory
from analytics.config.dag_config import default_args, hql_directory
dag_id = 'unique_editors_by_country_monthly'
var_props = VariableProperties(f'{dag_id}_config')
......@@ -27,7 +27,7 @@ with DAG(
tags=['geoeditors'],
user_defined_filters=filters,
default_args=var_props.get_merged('default_args',
{ **spark3_default_args,
{ **default_args,
'sla': timedelta(days=10),
}
)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment