diff --git a/README.md b/README.md index 09dda791315a831e14bf71520778a1a17ecad679..11a56883230cf456bb5a5cfdde32573a3b697181 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,73 @@ Build a jar with dependencies with ```bash ./mvnw clean package ``` + +# Deploy on YARN + +# Deploy + +A standalone cluster can be setup locally (on a stat machine atop YARN) with +``` +wget https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz +tar xvzf flink-1.15.0-bin-scala_2.12.tgz +cd flink-1.15.0 +export HADOOP_CLASSPATH=`hadoop classpath` +./bin/yarn-session.sh --detached +``` + +The `package` target can be manually copied to a stat machine with: +```bash +scp target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0 +``` + +Start a Flink cluster on YARN with +``` +export HADOOP_CLASSPATH=`hadoop classpath` +./bin/yarn-session.sh --detached +``` + +Finally launch the job with +```bash +./bin/flink run ./bin/flink run -c org.wikimedia.dataplatform.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar +``` + +## View the output of a Flink job + +On YARN stdout is directed to the container job, and won't be visible from the cli. +We can display container output by accessing its logs with +``` +yarn logs -applicationId -containerId +``` +Where +- `` is the Flink cluster id returned by `yarn-session.sh`, and visible at https://yarn.wikimedia.org. +- `` is the container running a specific task, that you can find in Flink's Task Manager at `https://yarn.wikimedia.org/proxy//#/task-manager`. + +For more details see the [project doc](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/overview/). +The [Flink Web Interface]() will be available at yarn.wikimedia.org under +`https://yarn.wikimedia.org/proxy/`. +# Config + +There's a couple of gotchas. + +## JVM + +We need to rewrite the `Host` HTTP header to properly route HTTP request from the +internal YARN cluster to https://api-ro.discovery.wmnet. + +To do so, we need to configure the JVM http-client to allow restricted headers. + +Add the following to `conf/flink-conf.yaml`: +``` +env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true +``` + +## Kerberos +Kerberos authentication is required to access WMF Analytics resources. +The relevant config settings are found in `conf/flink-conf.yaml`: +```properties +security.kerberos.login.use-ticket-cache: true +# security.kerberos.login.keytab: +security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA +# The configuration below defines which JAAS login contexts +security.kerberos.login.contexts: Client,KafkaClient +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index 136157aa6426740ff8e245ce8b176305544b05eb..0d035d604b4457f56adf73f0f25ee592f4534ce1 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 4.3.0 2.0.2 3.2.12 - 2.12.15 + 2.12.7 @@ -28,6 +28,22 @@ guava 31.0.1-jre + + com.softwaremill.sttp.client3 + async-http-client-backend-future_${scala.compat.version} + 3.6.2 + + + io.netty + netty-transport-native-epoll + + + + + com.softwaremill.sttp.client3 + core_${scala.compat.version} + 3.6.2 + io.circe circe-generic-extras_${scala.compat.version} @@ -63,6 +79,17 @@ org.wikimedia eventutilities 1.0.9 + + + javax.activation + activation + + + + + org.wikimedia.utils + http-client-utils + 1.0.0 org.apache.flink @@ -160,14 +187,12 @@ ${flink.version} test - org.scalatest scalatest_${scala.compat.version} ${scala.scalatest.version} test - @@ -186,6 +211,12 @@ + + + io.github.evis + scalafix-maven-plugin_${scala.compat.version} + 0.1.6_0.10.0 + @@ -306,8 +337,15 @@ org.scalastyle scalastyle-maven-plugin + 1.0.0 + + true + ${project.basedir}/src/main/scala + ${project.basedir}/src/test/scala + ${project.basedir}/scalastyle-output.xml + ${project.reporting.outputEncoding} + - org.scalatest scalatest-maven-plugin diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index 158090d8912e0b38e1a167b56da033ca5927f8e6..17c42f13a6986c4805e2c41cddd3866ab0d0ce57 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -1,10 +1,99 @@ package org.wikimedia.dataplatform +import io.circe.Json +import io.circe.parser._ +import io.circe._ +import io.circe.generic.semiauto._ +import io.circe.generic.extras.auto._ +import io.circe.generic.extras.Configuration +import org.slf4j.LoggerFactory +import org.wikimedia.dataplatform.events.PageCreate +import org.wikimedia.dataplatform.wmfapi.{ActionQuery, AsyncActionRequest} import org.apache.flink.streaming.api.scala._ +import org.wikimedia.dataplatform.enrich.Content +import scala.util.{Failure, Success} +import java.util.concurrent.TimeUnit + +class Enrichment + +/** + * A streaming job that listens for page_create events, + * queries the Action API for page content changes, and + * prints them to stdout. + * + * Note that: + * - Currently it uses the DataStream API to process messages for the page_create. + * Event schema boilerplate is defined in `org.wikimedia.dataplatform.events.PageCreate`. + * - Calls to the Action API are performed in a `AsyncDataStream`. The related Flink `AsyncFunction`, + * and http client, is defined in `wikimedia.dataplatform.AsyncActionRequest`. + * - Some Kafka boilerplate classes and a query builder helper are defined in the `dataplatform` + * package object. + */ object Enrichment { + private val Log = LoggerFactory.getLogger(classOf[Enrichment]) + // Flink application identifier. + private val JobName = "Mediawiki Stream Enrichment" + // Timeout (milliseconds) for an AsyncFunction to complete. + private val AsyncFunctionTimeoutMs = 2000 + // The max number of async i/o operations in progress at the same time in an AsyncDataStream instance. + // This is set to a safe value. It assumes the application is running on a default Flink config with + // - parallelism.default=1 + // - taskmanager.numberOfTaskSlots=1 + // Both can be adjusted in conf/flink.conf. + // See https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/ for more details. + private val AsyncDataStreamCapacity = 12 + + // TODO(gmodena, 2022-05-22) turn back on after we are done implementing / refactoring this class. + // scalastyle:off method.length def main(args: Array[String]): Unit = { + // TODO(gmodena, 2022-05-18): this is a helper for using Circe's decode[PageCreate](message) + // We can drop it once we switch to automatic schema conversion via Table API utils. + implicit val config: Configuration = Configuration.default.withDefaults + implicit val contentEncoder: Encoder[Content] = deriveEncoder[Content] + val env = StreamExecutionEnvironment.getExecutionEnvironment - env.execute + + /** 1. consume page_create and produce `DataStream[ActionQuery]`, that is a + * case class with input parameters to setup the AsyncFunction http request. */ + val pageCreateStream: DataStream[ActionQuery] = env.addSource(PageCreateSource) + .flatMap({ message: String => + Log.debug(message) + decode[PageCreate](message) match { + case Left(error) => + Log.error(error.toString) + None + case Right(pageCreate: PageCreate) => Option(pageCreate) + } + }).map((pageCreate: PageCreate) => { + val uri = buildRevisionContentQueryUri(pageCreate.rev_id) + val headers = Option( + Map("Host" -> pageCreate.meta.domain, + "User-Agent" -> UserAgent)) + Log.debug(s"Preparing to call $uri from Host: ${pageCreate.meta.domain}" ) + ActionQuery(uri, headers) + }) + + /** 2. Query the Action API, parse valid responses, and extract content. */ + val contentStream: DataStream[String] = AsyncDataStream.unorderedWait(pageCreateStream, + new AsyncActionRequest(), + timeout = AsyncFunctionTimeoutMs, + TimeUnit.MILLISECONDS, + AsyncDataStreamCapacity).flatMap({ + message: String => + Log.debug(message) + Content.from(message) match { + case Failure(error) => + Log.error(error.toString) // TODO(gmodena, 2022-05-19): maybe this should go in a side output? + None + case Success(content) => Option(content) + } + }).map((content: Content) => contentEncoder(content).toString) + + contentStream.print + + //val pageDeleteStream: DataStream[String] = env.addSource(PageDelete) + //val revisionCreateStream: DataStream[String] = env.addSource(RevisionCreate) + env.execute(JobName) } } diff --git a/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala b/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala new file mode 100644 index 0000000000000000000000000000000000000000..0e605ab210b2935131192d982798c2e7546d3d48 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala @@ -0,0 +1,58 @@ +package org.wikimedia.dataplatform.enrich + +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.auto._ +import io.circe.parser._ + +import scala.util.Try + +// TODO(gmodena, 2022-05,18): I've been a bit loose with this class. It's meant to +// just show some output while debugging. I uses the same field naming convention +// of the PageCreate schema. +// It will be replaced by a record that implements the output schema. +// See https://gitlab.wikimedia.org/repos/generated-data-platform/topics/mediawiki-stream-enrichment/-/merge_requests/2/diffs?view=parallel +case class Content(title: String, + article_id: Long, + contentmodel: Option[String], + contentformat: Option[String], + content: Option[String]) + +/** Parse the output of Action API's response and extract + * content data. */ +object Content { + // Derives a Circe config for interoperability with encode/decode methods. + // E.g. provides an implicit value for evidence parameter of + // type io.circe.Decoder[ActionQueryResponse] + implicit val config: Configuration = Configuration.default.withDefaults + + private def extractContent(actionQueryResponse: ActionQueryResponse): Content = { + // TODO(gmodena, 2022-05-31) do we need to validate slots? + // Maybe we can push these checks to the Json Decoder. + if (actionQueryResponse.query.pages.isEmpty) { + throw new NoSuchElementException("Page data is missing from payload") + } + + if (actionQueryResponse.query.pages.head.revisions.isEmpty) { + throw new NoSuchElementException("Revision data is missing from payload") + } + + val page = actionQueryResponse.query.pages.head + val revisions = page.revisions.head.slots.main + + new Content( + title = page.title, + article_id = page.pageid, + contentmodel = Option(revisions.contentmodel), + contentformat = Option(revisions.contentformat), + content = Option(revisions.content)) + } + + def from(body: String): Try[Content] = Try { + decode[ActionQueryResponse](body) match { + case Left(error) => + // TODO(gmodena, 2022-05-31): this should trigger a retry. + throw new IllegalArgumentException(error) + case Right(actionQueryResponse: ActionQueryResponse) => extractContent(actionQueryResponse) + } + } +} diff --git a/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala b/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala new file mode 100644 index 0000000000000000000000000000000000000000..46d4902095d0366f530921875483b5062acd551d --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala @@ -0,0 +1,39 @@ +package org.wikimedia.dataplatform.enrich + +protected case class Pages( + pageid: Int, + ns: Int, + title: String, + revisions: Seq[Revisions] + ) + +protected case class Query( + pages: Seq[Pages] + ) + +protected case class Revisions( + slots: Slots + ) + +protected case class Slots( + main: Main + ) + +protected case class Main( + contentmodel: String, + contentformat: String, + content: String + ) +/** + * Encapsulate the payload of a successful action=query request, + * parametrized by the buildRevisionContentQueryUri() helper. + * + * @param batchcomplete + * @param query + */ +case class ActionQueryResponse( + batchcomplete: Boolean, + query: Query + ) + + diff --git a/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala b/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala new file mode 100644 index 0000000000000000000000000000000000000000..70ccc7be6407e6de43e610617ebff41c75028bfa --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala @@ -0,0 +1,34 @@ +package org.wikimedia.dataplatform.events + +case class Meta(uri: String, + request_id: String, + id: String, + dt: String, + domain: String, + stream: String) + +case class Performer( + user_text: Option[String] = None, + user_groups: Option[List[String]] = None, + user_is_bot: Option[Boolean] = None, + user_id: Option[Long] = None, + user_registration_dt: Option[String] = None, + user_edit_count: Option[Long] = None + ) + +case class PageCreate(meta: Meta, + database: String, + page_id: Long, + page_title: String, + page_namespace: Int, + rev_id: Long, + rev_timestamp: String, + rev_sha1: String, + rev_minor_edit: Boolean, + rev_len: Int, + rev_content_model: String, + rev_content_format: String, + performer: Option[Performer] = None, + page_is_redirect: Boolean, + comment: Option[String], + parsedcomment: Option[String]) diff --git a/src/main/scala/org/wikimedia/dataplatform/package.scala b/src/main/scala/org/wikimedia/dataplatform/package.scala new file mode 100644 index 0000000000000000000000000000000000000000..1ae4470b08337f35c6d2745988b4d46a04060ff6 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/package.scala @@ -0,0 +1,54 @@ +package org.wikimedia + +import org.apache.flink.api.common.serialization.SimpleStringSchema +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer +import org.wikimedia.dataplatform.wmfapi.ActionQueryBuilder + +import scala.collection.immutable.ListMap +import java.util.Properties + +/** This package object contains a few utilities that don't have a well defined place yet. */ +package object dataplatform { + protected type KakfaSource = FlinkKafkaConsumer[String] + + // Broadcast to Mediawiki that Mediawiki Stream Enrichment requests are non-human traffic. + val UserAgent = "wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot" + + object KafkaConfig { + val properties = new Properties() + val KafkaDataCenter = "eqiad" + val KafkaBootstrapServers = s"kafka-jumbo1001.${KafkaDataCenter}.wmnet:9092" + val KafkaGroupId = "platform-event-driven-poc" + properties.setProperty("bootstrap.servers", KafkaBootstrapServers); + properties.setProperty("group.id", KafkaGroupId) + } + + private val TopicPrefix = s"${KafkaConfig.KafkaDataCenter}.mediawiki" + + val PageCreateSource: KakfaSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.page-create", + new SimpleStringSchema(), + KafkaConfig.properties) + + val PageDeleteSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.page-delete", + new SimpleStringSchema(), + KafkaConfig.properties) + + val RevisionCreateSource: KakfaSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.revision-create", + new SimpleStringSchema(), + KafkaConfig.properties) + + /** An helper method query the Action API and retrive revision content + * @param revisionId the target page revision */ + def buildRevisionContentQueryUri(revisionId: Long): String = { + val params: Map[String, String] = ListMap( + "action" -> "query", + "format" -> "json", + "formatversion" -> "2", + "prop" -> "revisions", + "revids" -> revisionId.toString, + "rvprop" -> "content", + "rvslots" -> "main") + + ActionQueryBuilder().addParams(params).toString() + } +} diff --git a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala new file mode 100644 index 0000000000000000000000000000000000000000..3eeacc48011192340307b913fb7d50dced911bca --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala @@ -0,0 +1,76 @@ +package org.wikimedia.dataplatform.wmfapi +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.flink.util.concurrent.Executors +import org.slf4j.LoggerFactory +import sttp.client3._ +import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend +import sttp.model.Uri + +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success} + +/** TODO(gmodena, 2022-05-18) we should put a generic component to + * https://gerrit.wikimedia.org/r/plugins/gitiles/wmf-jvm-utils/+/refs/heads/master/http-client-utils/ + * that detects a wmf domain and automatically do the routing to api-ro.discovery.wmnet + * so that api users don't have to bother about about crafting the host header +*/ + +/** Helper class to build query strings for the Action API. */ +class ActionQueryBuilder(private val url: String = "https://api-ro.discovery.wmnet/w/api.php") { + private val endpoint = uri"${url}" + + /** Append query parameters to the `endpoint` url. + * + * @param paramsMap a map of query parameters + * @return the endpoint URL with query string. + */ + def addParams(paramsMap: Map[String, String]): Uri = endpoint.addParams(paramsMap) +} + +object ActionQueryBuilder { + def apply(): ActionQueryBuilder = new ActionQueryBuilder() +} + +/** `ActionQuery` specifies input parameters for a callback to Mediawiki's Action API. + * Namely, the query string and (optionally) the domain name of the server + * + * @param uri the target URL containing and endpoint and query string parameters. + * @param headers additional HTTP headers (eg. Host: domain). When using the internal + * https://api-ro.discovery.wmnet/w/api.php endpoint, `headers` should contain `Host`. + * + * TODO(gmodena, 2022-05-20) it maybe makes more sense to have uri be a sttp.model.Uri, + * and avoid multiple conversions. Unfortunately, the type is not serializable in DataStream. + * Sticking to String for now. + */ +case class ActionQuery(uri: String, headers: Option[Map[String, String]]) + +/** An asynchronous Flink call back that performs an async query to the Mediawiki Action API. */ +class AsyncActionRequest extends AsyncFunction[ActionQuery, String] { + private val Log = LoggerFactory.getLogger(classOf[AsyncActionRequest]) + /** There exists only one instance of the AsyncFunction and it is called sequentially + * for each record in the respective partition of the stream. Unless the asyncInvoke(...) + * method returns fast and relies on a callback (by the client), it will not result in proper asynchronous I/O. + * + * Therefore we query Mediawiki with an async http client. + */ + private lazy val backend = AsyncHttpClientFutureBackend() + + override def asyncInvoke(input: ActionQuery, resultFuture: ResultFuture[String]): Unit = { + val headersMap = input.headers.getOrElse(Map()) + val request = basicRequest.headers(headersMap).get(uri"${input.uri}") + Log.debug(s"uri ${request.uri.toString()}\nheaders ${request.headers.toString}") + + request.send(backend).onComplete { + case Success(response) => + response.body match { + case Left(error) => + Log.error(error.toString) + resultFuture.complete(Iterable()) + case Right(body) => resultFuture.complete(Iterable(body.toString)) + } + case Failure(error) => + Log.error(error.toString) + resultFuture.complete(Iterable()) + }(ExecutionContext.global) + } +} diff --git a/src/test/scala/ContentSuite.scala b/src/test/scala/ContentSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..799dab08fd1580c5f831039717c8b9454ab47c56 --- /dev/null +++ b/src/test/scala/ContentSuite.scala @@ -0,0 +1,101 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.wikimedia.dataplatform.buildRevisionContentQueryUri +import org.wikimedia.dataplatform.enrich.Content + +// scalastyle:off line.size.limit +class ContentSuite extends AnyFlatSpec { + + "Content" should "be extracted from a json payload" in { + + // generated by + // curl "https://en.wikipedia.org/w/api.php?action=query&format=json&prop=revisions&revids=1088481788&formatversion=2&rvprop=content&rvslots=main" + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [ + | { + | "pageid": 70812160, + | "ns": 14, + | "title": "Category:Works and transport ministers of Namibia", + | "revisions": [ + | { + | "slots": { + | "main": { + | "contentmodel": "wikitext", + | "contentformat": "text/x-wiki", + | "content": "[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]" + | } + | } + | } + | ] + | } + | ] + | } + |}""".stripMargin + + val expected = Content( + title="Category:Works and transport ministers of Namibia", + article_id = 70812160, + contentmodel = Option("wikitext"), + contentformat = Option("text/x-wiki"), + content = Option("[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]")) + + val content = Content.from(jsonString).get + + assert (expected === content) + } + + "A json payload with no pages" should "result in a parsing error" in { + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [] + | } + |}""".stripMargin + + val thrown = intercept[NoSuchElementException] { + Content.from(jsonString).get + } + assert ("Page data is missing from payload" === thrown.getMessage) + } + + "A json payload with no revisions" should "result in a parsing error" in { + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [ + | { + | "pageid": 70812160, + | "ns": 14, + | "title": "Category:Works and transport ministers of Namibia", + | "revisions": [] + | } + | ] + | } + |}""".stripMargin + + val thrown = intercept[NoSuchElementException] { + Content.from(jsonString).get + } + assert ("Revision data is missing from payload" === thrown.getMessage) + } + + "A query string" should "be generated for a page and revision id" in { + // By default we query api-ro.discovery. + val url = "api-ro.discovery.wmnet/w/api.php" + val revisionId = 1088481788L + val formatVersion = 2 + // Query params used to generated the Content test case. + val params = s"""action=query&format=json&formatversion=${formatVersion}&prop=revisions&revids=${revisionId}&rvprop=content&rvslots=main""" + val expectedUri = s"https://$url?$params" + + val actionQueryUri = buildRevisionContentQueryUri(revisionId) + assert (expectedUri === actionQueryUri) + } +} diff --git a/src/test/scala/EnrichmentSuite.scala b/src/test/scala/EnrichmentSuite.scala index bf20ac1f80974341d09e8feb33835541df1422d0..250db05065ae9cfa48b1bdf2418bc3d40841b773 100644 --- a/src/test/scala/EnrichmentSuite.scala +++ b/src/test/scala/EnrichmentSuite.scala @@ -15,7 +15,7 @@ class EnrichmentIntegrationTest extends AnyFlatSpec with Matchers with BeforeAnd .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build) - + val JobName = "MediawikiStreamEnrichmentMiniCluster" before { flinkCluster.before() } @@ -40,7 +40,7 @@ class EnrichmentIntegrationTest extends AnyFlatSpec with Matchers with BeforeAnd .addSink(new CollectSink()) // execute - env.execute() + env.execute(JobName) // verify your results CollectSink.values should contain allOf (1L, 21L, 22L) diff --git a/src/test/scala/JsonSerDeSuite.scala b/src/test/scala/JsonSerDeSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..1c301d45290823507fa2d7435364ab9543894d27 --- /dev/null +++ b/src/test/scala/JsonSerDeSuite.scala @@ -0,0 +1,64 @@ +import org.scalatest.flatspec.AnyFlatSpec +import io.circe.parser._ +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.auto._ +import org.wikimedia.dataplatform.events.PageCreate + +class JsonSerDeSuite extends AnyFlatSpec { + implicit val config: Configuration = Configuration.default.withDefaults + + "page_create events" should "be decoded into a case class" in { + // scalastyle:off line.length + val jsonString = """ + |{ + | "$schema": "/mediawiki/revision/create/1.1.0", + | "meta": { + | "uri": "https://www.wikidata.org/wiki/Q112074657", + | "request_id": "272b1299-9c16-4e77-93dd-ff54ea63d9fe", + | "id": "502c8a6d-7a72-4ef2-a379-932e50b24f67", + | "dt": "2022-05-19T15:17:04Z", + | "domain": "www.wikidata.org", + | "stream": "mediawiki.page-create" + | }, + | "database": "wikidatawiki", + | "page_id": 106985263, + | val jsonString = \"\"\"{"$schema":"/mediawiki/revision/create/1.1.0", + | "page_namespace": 0, + | "rev_id": 1642380248, + | "rev_timestamp": "2022-05-19T15:17:04Z", + | "rev_sha1": "cvs6dh4poov6gmle643sd2yoyaimi9w", + | "rev_minor_edit": false, + | "rev_len": 330, + | "rev_content_model": "wikibase-item", + | "rev_content_format": "application/json", + | "performer": { + | "user_text": "Moebeus", + | "user_groups": [ + | "*", + | "user", + | "autoconfirmed" + | ], + | "user_is_bot": false, + | "user_id": 2848912, + | "user_registration_dt": "2017-07-05T16:37:58Z", + | "user_edit_count": 2824907 + | }, + | "page_is_redirect": false, + | "comment": "/* wbeditentity-create:2|en */ Chris Botti discography, Wikimedia artist discography", + | "parsedcomment": + | "wbeditentity-create:2|en: + | Chris Botti discography, Wikimedia artist discography", + | "rev_slots": { + | "main": { + | "rev_slot_content_model": "wikibase-item", + | "rev_slot_sha1": "cvs6dh4poov6gmle643sd2yoyaimi9w", + | "rev_slot_size": 330, + | "rev_slot_origin_rev_id": 1642380248 + | } + | } + |}""" + + val decoded = decode[PageCreate](jsonString).getOrElse(None) + assert (decoded != null) + } +}