Skip to content

Add job for MediaWiki history dumps.

Xcollazo requested to merge T300344-mediawiki-history-dumps into main

This MR adds an Airflow job equivalent to the oozie job at https://github.com/wikimedia/analytics-refinery/tree/master/oozie/mediawiki/history/dumps.

A couple notes on the refinery-job version chosen:

  1. The oozie job is running on version refinery-job-0.0.134.jar, which is quite old.

  2. I wanted to bump the dependency so that we can later more easily remove old artifacts. I first tried running the job with the latest Spark3-compatible refinery-job-0.2.3-shaded.jar. That failed as Spark3 is not happy with org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent.

Stack here
User class threw exception: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:923)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.repartitionData(MediawikiHistoryDumper.scala:208)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.apply(MediawikiHistoryDumper.scala:132)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.main(MediawikiHistoryDumper.scala:111)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper.main(MediawikiHistoryDumper.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 1.0 failed 4 times, most recent failure: Lost task 60.3 in stage 1.0 (TID 11) (an-worker1106.eqiad.wmnet executor 1): java.lang.NullPointerException
at org.apache.spark.sql.Row.getSeq(Row.scala:319)
at org.apache.spark.sql.Row.getSeq$(Row.scala:319)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)
at org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent$.fromRow(MediawikiEvent.scala:496)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.$anonfun$repartitionData$2(MediawikiHistoryDumper.scala:182)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
... 31 more
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.Row.getSeq(Row.scala:319)
at org.apache.spark.sql.Row.getSeq$(Row.scala:319)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)
at org.wikimedia.analytics.refinery.job.mediawikihistory.denormalized.MediawikiEvent$.fromRow(MediawikiEvent.scala:496)
at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.$anonfun$repartitionData$2(MediawikiHistoryDumper.scala:182)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
  1. I then tried with the latest Spark2 compatible jar file refinery-hive-0.1.27-shaded.jar. This failed as well as there were changes done on https://github.com/wikimedia/analytics-refinery-source/commit/fce97f807f12b7405e5ebe11d6d47ef8a30dbe7c that make the job fail with:
Stack here:
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,979] {spark_submit.py:523} INFO - Exception in thread "main" java.lang.IllegalArgumentException: Can not create a Path from an empty string
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.apache.hadoop.fs.Path.checkPathArg(Path.java:163)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.apache.hadoop.fs.Path.<init>(Path.java:175)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1$anonfun$apply$3.apply(MediawikiHistoryDumper.scala:274)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,980] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1$anonfun$apply$3.apply(MediawikiHistoryDumper.scala:262)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1.apply(MediawikiHistoryDumper.scala:262)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$anonfun$archiveData$1.apply(MediawikiHistoryDumper.scala:259)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,981] {spark_submit.py:523} INFO - at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.archiveData(MediawikiHistoryDumper.scala:259)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.apply(MediawikiHistoryDumper.scala:135)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper$.main(MediawikiHistoryDumper.scala:111)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryDumper.main(MediawikiHistoryDumper.scala)
</details>
<details><summary>Click to expand</summary>
[2022-08-25 23:00:23,982] {spark_submit.py:523} INFO - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
</details>
<details><summary>Click to expand</summary>
  1. I then figured that I should try with the most recent one that does not have the change that makes us fail as in (2). That was refinery-job-0.1.25-shaded.jar. This version worked successfully.

  2. Just like in other recent Airflow jobs, we should make this work on Spark3, but it felt out of scope to attempt that on this MR.

Merge request reports