Add job for MediaWiki history dumps.
This MR adds an Airflow job equivalent to the oozie job at https://github.com/wikimedia/analytics-refinery/tree/master/oozie/mediawiki/history/dumps.
A couple notes on the refinery-job version chosen:
-
The oozie job is running on version
refinery-job-0.0.134.jar
, which is quite old. -
I wanted to bump the dependency so that we can later more easily remove old artifacts. I first tried running the job with the latest Spark3-compatible
refinery-job-0.2.3-shaded.jar
. That failed as Spark3 is not happy with org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent.
Stack here
User class threw exception: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:923)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.repartitionData(MediawikiHistoryDumper.scala:208)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.apply(MediawikiHistoryDumper.scala:132)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.main(MediawikiHistoryDumper.scala:111)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper.main(MediawikiHistoryDumper.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 1.0 failed 4 times, most recent failure: Lost task 60.3 in stage 1.0 (TID 11) (an-worker1106.eqiad.wmnet executor 1): java.lang.NullPointerException
at org.apache.spark.sql.Row.getSeq(Row.scala:319)
at org.apache.spark.sql.Row.getSeq$(Row.scala:319)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)
at org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent$.fromRow(MediawikiEvent.scala:496)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.$anonfun$repartitionData$2(MediawikiHistoryDumper.scala:182)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
... 31 more
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Row.getSeq(Row.scala:319)
at org.apache.spark.sql.Row.getSeq$(Row.scala:319)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)
at org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent$.fromRow(MediawikiEvent.scala:496)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.$anonfun$repartitionData$2(MediawikiHistoryDumper.scala:182)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
- I then tried with the latest Spark2 compatible jar file
refinery-hive-0.1.27-shaded.jar
. This failed as well as there were changes done on https://github.com/wikimedia/analytics-refinery-source/commit/fce97f807f12b7405e5ebe11d6d47ef8a30dbe7c that make the job fail with:
Stack here:
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,979] {spark_submit.py:523} INFO - Exception in thread "main" java.lang.IllegalArgumentException: Can not create a Path from an empty string
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.apache.hadoop.fs.Path.checkPathArg(Path.java:163)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.apache.hadoop.fs.Path.<init>(Path.java:175)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1$anonfun$apply$3.apply(MediawikiHistoryDumper.scala:274)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1$anonfun$apply$3.apply(MediawikiHistoryDumper.scala:262)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1.apply(MediawikiHistoryDumper.scala:262)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1.apply(MediawikiHistoryDumper.scala:259)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.archiveData(MediawikiHistoryDumper.scala:259)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.apply(MediawikiHistoryDumper.scala:135)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.main(MediawikiHistoryDumper.scala:111)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper.main(MediawikiHistoryDumper.scala)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
</details>
<details><summary>Click to expand</summary>
-
I then figured that I should try with the most recent one that does not have the change that makes us fail as in (2). That was
refinery-job-0.1.25-shaded.jar
. This version worked successfully. -
Just like in other recent Airflow jobs, we should make this work on Spark3, but it felt out of scope to attempt that on this MR.