From 6615781eea0f2ad50cf8c3f263014e9e5ab88cca Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Sun, 22 May 2022 10:57:49 +0200 Subject: [PATCH 01/11] Initial parsing logic --- README.md | 70 ++++++++++++++ pom.xml | 44 ++++++++- .../org/wikimedia/dataplatform/Content.scala | 69 ++++++++++++++ .../wikimedia/dataplatform/Enrichment.scala | 91 ++++++++++++++++++- .../dataplatform/events/PageCreate.scala | 34 +++++++ .../org/wikimedia/dataplatform/package.scala | 56 ++++++++++++ .../wmfapi/AsyncActionRequest.scala | 78 ++++++++++++++++ src/test/scala/ContentSuite.scala | 71 +++++++++++++++ src/test/scala/EnrichmentSuite.scala | 4 +- src/test/scala/JsonSerDeSuite.scala | 61 +++++++++++++ 10 files changed, 571 insertions(+), 7 deletions(-) create mode 100644 src/main/scala/org/wikimedia/dataplatform/Content.scala create mode 100644 src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala create mode 100644 src/main/scala/org/wikimedia/dataplatform/package.scala create mode 100644 src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala create mode 100644 src/test/scala/ContentSuite.scala create mode 100644 src/test/scala/JsonSerDeSuite.scala diff --git a/README.md b/README.md index 09dda79..11a5688 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,73 @@ Build a jar with dependencies with ```bash ./mvnw clean package ``` + +# Deploy on YARN + +# Deploy + +A standalone cluster can be setup locally (on a stat machine atop YARN) with +``` +wget https://dlcdn.apache.org/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz +tar xvzf flink-1.15.0-bin-scala_2.12.tgz +cd flink-1.15.0 +export HADOOP_CLASSPATH=`hadoop classpath` +./bin/yarn-session.sh --detached +``` + +The `package` target can be manually copied to a stat machine with: +```bash +scp target/enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar stat1005.eqiad.wmnet:~/flink-1.15.0 +``` + +Start a Flink cluster on YARN with +``` +export HADOOP_CLASSPATH=`hadoop classpath` +./bin/yarn-session.sh --detached +``` + +Finally launch the job with +```bash +./bin/flink run ./bin/flink run -c org.wikimedia.dataplatform.Enrichment enrichment-1.0-SNAPSHOT-jar-with-dependencies.jar +``` + +## View the output of a Flink job + +On YARN stdout is directed to the container job, and won't be visible from the cli. +We can display container output by accessing its logs with +``` +yarn logs -applicationId -containerId +``` +Where +- `` is the Flink cluster id returned by `yarn-session.sh`, and visible at https://yarn.wikimedia.org. +- `` is the container running a specific task, that you can find in Flink's Task Manager at `https://yarn.wikimedia.org/proxy//#/task-manager`. + +For more details see the [project doc](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/overview/). +The [Flink Web Interface]() will be available at yarn.wikimedia.org under +`https://yarn.wikimedia.org/proxy/`. +# Config + +There's a couple of gotchas. + +## JVM + +We need to rewrite the `Host` HTTP header to properly route HTTP request from the +internal YARN cluster to https://api-ro.discovery.wmnet. + +To do so, we need to configure the JVM http-client to allow restricted headers. + +Add the following to `conf/flink-conf.yaml`: +``` +env.java.opts: -Dsun.net.http.allowRestrictedHeaders=true +``` + +## Kerberos +Kerberos authentication is required to access WMF Analytics resources. +The relevant config settings are found in `conf/flink-conf.yaml`: +```properties +security.kerberos.login.use-ticket-cache: true +# security.kerberos.login.keytab: +security.kerberos.login.principal: krbtgt/WIKIMEDIA@WIKIMEDIA +# The configuration below defines which JAAS login contexts +security.kerberos.login.contexts: Client,KafkaClient +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index 136157a..e23c31a 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,16 @@ guava 31.0.1-jre + + com.softwaremill.sttp.client3 + async-http-client-backend-future_${scala.compat.version} + 3.6.2 + + + com.softwaremill.sttp.client3 + core_${scala.compat.version} + 3.6.2 + io.circe circe-generic-extras_${scala.compat.version} @@ -64,6 +74,11 @@ eventutilities 1.0.9 + + org.wikimedia.utils + http-client-utils + 1.0.0 + org.apache.flink flink-clients @@ -160,14 +175,12 @@ ${flink.version} test - org.scalatest scalatest_${scala.compat.version} ${scala.scalatest.version} test - @@ -186,6 +199,12 @@ + + + io.github.evis + scalafix-maven-plugin_${scala.compat.version} + 0.1.6_0.10.0 + @@ -284,6 +303,15 @@ org.scala-lang scala-library + + javax.activation + activation + 1.1 + + + io.netty + netty-transport-native-epoll + org.apache.flink flink-runtime @@ -306,8 +334,18 @@ org.scalastyle scalastyle-maven-plugin + 1.0.0 + + false + true + true + false + ${project.basedir}/src/main/scala + ${project.basedir}/src/test/scala + ${project.basedir}/scalastyle-output.xml + UTF-8 + - org.scalatest scalatest-maven-plugin diff --git a/src/main/scala/org/wikimedia/dataplatform/Content.scala b/src/main/scala/org/wikimedia/dataplatform/Content.scala new file mode 100644 index 0000000..0dcef21 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/Content.scala @@ -0,0 +1,69 @@ +package org.wikimedia.dataplatform + +import io.circe.Json +import io.circe.generic.semiauto.deriveCodec + +import scala.util.Try + +// TODO(gmodena, 2022-05,18): I've been a bit loose with this class. It's meant to +// just show some output while debugging. I uses the same field naming convention +// of the PageCreate schema. +// It will be replaced by a record that implements the output schema. +// See https://gitlab.wikimedia.org/repos/generated-data-platform/topics/mediawiki-stream-enrichment/-/merge_requests/2/diffs?view=parallel +case class Content(title: String, + article_id: Long, + contentmodel: Option[String], + contentformat: Option[String], + content: Option[String]) + +/** Parse the output of Action API's response and extract + * content data. */ +object Content { + // Dervices a Circe coded for this class for interoperability with encode/decode methods. + implicit val codec = deriveCodec[Content] + + // FIXME: scala might not like pattern matching on variables. + private def getStringField(json: Json, key: String): Option[String] = { + (json findAllByKey key).collectFirst({ + case field: Any => field.asString + case _ => None + }).flatten + } + + // FIXME: scala might not like pattern matching on variables. + private def getLongField(json: Json, key: String): Option[Long] = { + (json findAllByKey key).collectFirst({ + case field: Any => field.asNumber + case _ => None + }).flatten.flatMap(a => a.toLong) + } + + /** Parse an Action API response and extract `Content` object. + * + * @param body response body returned by `GetRevisionContent`. + * */ + def from(body: Json): Try[Content] = Try { + // FIXME(gmodena, 2022-05-20): Keys like "*" or numercial IDs in the nested arraylist break auto-mappings. + // For now we search for revision content directly on the Json object. + val revisions = (body findAllByKey "revisions") match { + case Nil => List() + case List() => List() + case revisions: List[Json] => revisions.head findAllByKey "main" // Assumes we retrieve the main slot + } + + val title = getStringField(body, "title") + val articleId = getLongField(body, "pageid") + val revisionJson = revisions.head + val contentModel = getStringField(revisionJson, "contentmodel") + val contentFormat = getStringField(revisionJson, "contentformat") + val content = getStringField(revisionJson, "*") + + new Content( + title = title.get, + article_id = articleId.get, + contentmodel = contentModel, + contentformat = contentFormat, + content = content + ) + } +} diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index 158090d..a3acf27 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -1,10 +1,97 @@ package org.wikimedia.dataplatform -import org.apache.flink.streaming.api.scala._ +//import org.apache.flink.streaming.api.scala._ +import io.circe.Json +import io.circe.parser._ +import io.circe._ +import io.circe.generic.semiauto._ +import org.slf4j.LoggerFactory +import org.wikimedia.dataplatform.events.PageCreate +import org.wikimedia.dataplatform.wmfapi.{ActionQuery, AsyncActionRequest} +import scala.util.{Failure, Success} +import java.util.concurrent.TimeUnit + +class Enrichment + +/** + * A streaming job that listens for page_create events, + * queries the Action API for page content changes, and + * prints them to stdout. + * + * Note that: + * - Currently it uses the DataStream API to process messages for the page_create. + * Event schema boilerplate is defined in `org.wikimedia.dataplatform.events.PageCreate`. + * - Calls to the Action API are performed in a `AsyncDataStream`. The related Flink `AsyncFunction`, + * and http client, is defined in `wikimedia.dataplatform.AsyncActionRequest`. + * - Some Kafka boilerplate classes and a query builder helper are defined in the `dataplatform` + * package object. + */ object Enrichment { + private val Log = LoggerFactory.getLogger(classOf[Enrichment]) + private val JobName = "Mediawiki Stream Enrichment" + private val QueryTimeoutMs = 2000 + private val AsyncDataStreamCapacity = 12 + def main(args: Array[String]): Unit = { + import io.circe.generic.extras.Configuration + import io.circe.generic.extras.auto._ + import org.apache.flink.streaming.api.scala._ + // TODO(gmodena, 2022-05-18): this is a helper for using Circe's decode[PageCreate](message) + // We can drop it once we switch to automatic schema conversion via Table API utils. + implicit val config: Configuration = Configuration.default.withDefaults + implicit val contentEncoder: Encoder[Content] = deriveEncoder[Content] + val env = StreamExecutionEnvironment.getExecutionEnvironment - env.execute + + /** 1. consume page_create and produce `DataStream[ActionQuery]`, that is a + * case class with input parameters to setup the AsyncFunction http request. */ + val pageCreateStream: DataStream[ActionQuery] = env.addSource(PageCreateSource) + .flatMap({ message: String => + Log.debug(message) + decode[PageCreate](message) match { + case Left(error) => + Log.error(error.toString) + None + case Right(pageCreate: PageCreate) => Option(pageCreate) + } + }).map((pageCreate: PageCreate) => { + val uri = buildRevisionContentQueryUri(pageCreate.page_title, pageCreate.rev_id) + val headers = Option( + Map("Host" -> pageCreate.meta.domain, + "User-Agent" -> UserAgent)) + Log.debug(s"Preparing to call $uri from Host: ${pageCreate.meta.domain}" ) + ActionQuery(uri, headers) + }) + + /** 2. Query the Action API, parse valid responses, and extract content. */ + val contentStream: DataStream[String] = AsyncDataStream.unorderedWait(pageCreateStream, + new AsyncActionRequest(), + timeout = QueryTimeoutMs, + TimeUnit.MILLISECONDS, + AsyncDataStreamCapacity).flatMap({ + message: String => + Log.debug(message) + parse(message) match { + case Left(error) => + Log.error(error.toString) + None + case Right(payload) => Option(payload) + } + }).map((payload: Json) => Content.from(payload) match { + case Failure(error) => + Log.error { + error.toString + } + error.toString // TODO(gmodena, 2022-05-19): maybe this should go in a side output? + case Success(content) => contentEncoder(content).toString + }) + + contentStream.print + + //val pageDeleteStream: DataStream[String] = env.addSource(PageDelete) + //val revisionCreateStream: DataStream[String] = env.addSource(RevisionCreate) + + env.execute(JobName) } } diff --git a/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala b/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala new file mode 100644 index 0000000..70ccc7b --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/events/PageCreate.scala @@ -0,0 +1,34 @@ +package org.wikimedia.dataplatform.events + +case class Meta(uri: String, + request_id: String, + id: String, + dt: String, + domain: String, + stream: String) + +case class Performer( + user_text: Option[String] = None, + user_groups: Option[List[String]] = None, + user_is_bot: Option[Boolean] = None, + user_id: Option[Long] = None, + user_registration_dt: Option[String] = None, + user_edit_count: Option[Long] = None + ) + +case class PageCreate(meta: Meta, + database: String, + page_id: Long, + page_title: String, + page_namespace: Int, + rev_id: Long, + rev_timestamp: String, + rev_sha1: String, + rev_minor_edit: Boolean, + rev_len: Int, + rev_content_model: String, + rev_content_format: String, + performer: Option[Performer] = None, + page_is_redirect: Boolean, + comment: Option[String], + parsedcomment: Option[String]) diff --git a/src/main/scala/org/wikimedia/dataplatform/package.scala b/src/main/scala/org/wikimedia/dataplatform/package.scala new file mode 100644 index 0000000..e130f5e --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/package.scala @@ -0,0 +1,56 @@ +package org.wikimedia + +import org.apache.flink.api.common.serialization.SimpleStringSchema +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer +import org.wikimedia.dataplatform.wmfapi.ActionQueryBuilder + +import scala.collection.immutable.ListMap +import java.util.Properties + +/** This package object contains a few utilities that don't have a well defined place yet. */ +package object dataplatform { + protected type KakfaSource = FlinkKafkaConsumer[String] + + // Broadcast to Mediawiki that Mediawiki Stream Enrichment requests are non-human traffic. + val UserAgent = "wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot" + + object KafkaConfig { + val properties = new Properties() + val KafkaDataCenter = "eqiad" + val KafkaBootstrapServers = s"kafka-jumbo1001.${KafkaDataCenter}.wmnet:9092" + val KafkaGroupId = "platform-event-driven-poc" + properties.setProperty("bootstrap.servers", KafkaBootstrapServers); + properties.setProperty("group.id", KafkaGroupId) + } + + private val TopicPrefix = s"${KafkaConfig.KafkaDataCenter}.mediawiki" + + val PageCreateSource: KakfaSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.page-create", + new SimpleStringSchema(), + KafkaConfig.properties) + + val PageDeleteSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.page-delete", + new SimpleStringSchema(), + KafkaConfig.properties) + + val RevisionCreateSource: KakfaSource = new FlinkKafkaConsumer[String](s"${TopicPrefix}.revision-create", + new SimpleStringSchema(), + KafkaConfig.properties) + + /** An helper method query the Action API and retrive revision content + * @param pageTitle the target page title + * @param revisionId the target page revision */ + def buildRevisionContentQueryUri(pageTitle: String, revisionId: Long): String = { + val params: Map[String, String] = ListMap( + "action" -> "query", + "format" -> "json", + "titles" -> pageTitle, + "prop" -> "revisions", + "rvprop" -> "content|ids", + "rvslots" -> "main", + "rvstartid" -> revisionId.toString, + "rvendid" -> revisionId.toString) + + ActionQueryBuilder().addParams(params).toString() + } +} diff --git a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala new file mode 100644 index 0000000..a3610a4 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala @@ -0,0 +1,78 @@ +package org.wikimedia.dataplatform.wmfapi +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.flink.util.concurrent.Executors +import org.slf4j.LoggerFactory +import sttp.client3._ +import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend +import sttp.model.Uri + +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success} + +/** TODO(gmodena, 2022-05-18) we should put a generic component to + * https://gerrit.wikimedia.org/r/plugins/gitiles/wmf-jvm-utils/+/refs/heads/master/http-client-utils/ + * that detects a wmf domain and automatically do the routing to api-ro.discovery.wmnet + * so that api users don't have to bother about about crafting the host header +*/ + +/** Helper class to build query strings for the Action API. */ +class ActionQueryBuilder(private val url: String = "https://api-ro.discovery.wmnet/w/api.php") { + private val endpoint = uri"${url}" + + /** Append query parameters to the `endpoint` url. + * + * @param paramsMap a map of query parameters + * @return the endpoint URL with query string. + */ + def addParams(paramsMap: Map[String, String]): Uri = endpoint.addParams(paramsMap) +} + +object ActionQueryBuilder { + def apply(): ActionQueryBuilder = new ActionQueryBuilder() +} + +/** `ActionQuery` specifies input parameters for a callback to Mediawiki's Action API. + * Namely, the query string and (optionally) the domain name of the server + * + * @param uri the target URL containing and endpoint and query string parameters. + * @param headers additional HTTP headers (eg. Host: domain). When using the internal + * https://api-ro.discovery.wmnet/w/api.php endpoint, `headers` should contain `Host`. + * + * TODO(gmodena, 2022-05-20) it maybe makes more sense to have uri be a sttp.model.Uri, + * and avoid multiple conversions. Unfortunately, the type is not serializable in DataStream. + * Sticking to String for now. + */ +case class ActionQuery(uri: String, headers: Option[Map[String, String]]) + +/** An asynchronous Flink call back that performs an async query to the Mediawiki Action API. */ +class AsyncActionRequest extends AsyncFunction[ActionQuery, String] { + private val Log = LoggerFactory.getLogger(classOf[AsyncActionRequest]) + implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) + /** There exists only one instance of the AsyncFunction and it is called sequentially + * for each record in the respective partition of the stream. Unless the asyncInvoke(...) + * method returns fast and relies on a callback (by the client), it will not result in proper asynchronous I/O. + * + * Therefore we query Mediawiki with an async http client. + */ + implicit lazy val backend = AsyncHttpClientFutureBackend() + + override def asyncInvoke(input: ActionQuery, resultFuture: ResultFuture[String]): Unit = { + val headersMap = input.headers.getOrElse(Map()) + val request = basicRequest.headers(headersMap).get(uri"${input.uri}") + Log.debug(s"uri ${request.uri.toString()}\nheaders ${request.headers.toString}") + + request.send(backend).onComplete { + case Success(response) => + //asyncHttpClient.close() + response.body match { + case Left(error) => + Log.error(error.toString) + resultFuture.complete(Iterable()) + case Right(body) => resultFuture.complete(Iterable(body.toString)) + } + case Failure(error) => + Log.error(error.toString) + resultFuture.complete(Iterable()) + }(ExecutionContext.global) + } +} diff --git a/src/test/scala/ContentSuite.scala b/src/test/scala/ContentSuite.scala new file mode 100644 index 0000000..8aed3b4 --- /dev/null +++ b/src/test/scala/ContentSuite.scala @@ -0,0 +1,71 @@ +import org.scalatest.flatspec.AnyFlatSpec +import io.circe._ +import io.circe.parser._ +import org.wikimedia.dataplatform.{Content, buildRevisionContentQueryUri} + +class ContentSuite extends AnyFlatSpec { + + "Content" should "be extracted from a json payload" in { + + // generated by + // curl "https://en.wikipedia.org/w/api.php?action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content|ids&rvslots=main&rvstartid=1088481788&rvendid=1088481788" + val jsonString =""" + |{ + | "batchcomplete": "", + | "query": { + | "normalized": [ + | { + | "from": "Category:Works_and_transport_ministers_of_Namibia", + | "to": "Category:Works and transport ministers of Namibia" + | } + | ], + | "pages": { + | "70812160": { + | "pageid": 70812160, + | "ns": 14, + | "title": "Category:Works and transport ministers of Namibia", + | "revisions": [ + | { + | "revid": 1088481788, + | "parentid": 0, + | "slots": { + | "main": { + | "contentmodel": "wikitext", + | "contentformat": "text/x-wiki", + | "*": "[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]" + | } + | } + | } + | ] + | } + | } + | } + |}""".stripMargin + + val expected = Content( + title="Category:Works and transport ministers of Namibia", + article_id = 70812160, + contentmodel = Option("wikitext"), + contentformat = Option("text/x-wiki"), + content = Option("[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]")) + + val parsed = parse(jsonString).getOrElse(Json.Null) + val content = Content.from(parsed).get + + assert (expected === content) + } + + "A query string" should "be generated for a page and revision id" in { + // By default we query api-ro.discovery. + val url = "api-ro.discovery.wmnet/w/api.php" + val pageTitle = "Category:Works_and_transport_ministers_of_Namibia" + val revisionId = 1088481788L + + // Query params used to generated the Content test case. + val params = "action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content%7Cids&rvslots=main&rvstartid=1088481788&rvendid=1088481788" + val expectedUri = s"https://$url?$params" + + val actionQueryUri = buildRevisionContentQueryUri(pageTitle, revisionId) + assert (expectedUri === actionQueryUri) + } +} diff --git a/src/test/scala/EnrichmentSuite.scala b/src/test/scala/EnrichmentSuite.scala index bf20ac1..250db05 100644 --- a/src/test/scala/EnrichmentSuite.scala +++ b/src/test/scala/EnrichmentSuite.scala @@ -15,7 +15,7 @@ class EnrichmentIntegrationTest extends AnyFlatSpec with Matchers with BeforeAnd .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build) - + val JobName = "MediawikiStreamEnrichmentMiniCluster" before { flinkCluster.before() } @@ -40,7 +40,7 @@ class EnrichmentIntegrationTest extends AnyFlatSpec with Matchers with BeforeAnd .addSink(new CollectSink()) // execute - env.execute() + env.execute(JobName) // verify your results CollectSink.values should contain allOf (1L, 21L, 22L) diff --git a/src/test/scala/JsonSerDeSuite.scala b/src/test/scala/JsonSerDeSuite.scala new file mode 100644 index 0000000..6f1d036 --- /dev/null +++ b/src/test/scala/JsonSerDeSuite.scala @@ -0,0 +1,61 @@ +import org.scalatest.flatspec.AnyFlatSpec +import io.circe.parser._ +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.auto._ +import org.wikimedia.dataplatform.events.PageCreate + +class JsonSerDeSuite extends AnyFlatSpec { + implicit val config: Configuration = Configuration.default.withDefaults + + "page_create events" should "be decoded into a case class" in { + val jsonString = """ + |{ + | "$schema": "/mediawiki/revision/create/1.1.0", + | "meta": { + | "uri": "https://www.wikidata.org/wiki/Q112074657", + | "request_id": "272b1299-9c16-4e77-93dd-ff54ea63d9fe", + | "id": "502c8a6d-7a72-4ef2-a379-932e50b24f67", + | "dt": "2022-05-19T15:17:04Z", + | "domain": "www.wikidata.org", + | "stream": "mediawiki.page-create" + | }, + | "database": "wikidatawiki", + | "page_id": 106985263, val jsonString = \"\"\"{"$schema":"/mediawiki/revision/create/1.1.0","meta":{"uri":"https://www.wikidata.org/wiki/Q112074657","request_id":"272b1299-9c16-4e77-93dd-ff54ea63d9fe","id":"502c8a6d-7a72-4ef2-a379-932e50b24f67","dt":"2022-05-19T15:17:04Z","domain":"www.wikidata.org","stream":"mediawiki.page-create"},"database":"wikidatawiki","page_id":106985263,"page_title":"Q112074657","page_namespace":0,"rev_id":1642380248,"rev_timestamp":"2022-05-19T15:17:04Z","rev_sha1":"cvs6dh4poov6gmle643sd2yoyaimi9w","rev_minor_edit":false,"rev_len":330,"rev_content_model":"wikibase-item","rev_content_format":"application/json","performer":{"user_text":"Moebeus","user_groups":["*","user","autoconfirmed"],"user_is_bot":false,"user_id":2848912,"user_registration_dt":"2017-07-05T16:37:58Z","user_edit_count":2824907},"page_is_redirect":false,"comment":"/* wbeditentity-create:2|en */ Chris Botti discography, Wikimedia artist discography","parsedcomment":"wbeditentity-create:2|en: Chris Botti discography, Wikimedia artist discography","rev_slots":{"main":{"rev_slot_content_model":"wikibase-item","rev_slot_sha1":"cvs6dh4poov6gmle643sd2yoyaimi9w","rev_slot_size":330,"rev_slot_origin_rev_id":1642380248}}}\"\"\" + | + | "page_namespace": 0, + | "rev_id": 1642380248, + | "rev_timestamp": "2022-05-19T15:17:04Z", + | "rev_sha1": "cvs6dh4poov6gmle643sd2yoyaimi9w", + | "rev_minor_edit": false, + | "rev_len": 330, + | "rev_content_model": "wikibase-item", + | "rev_content_format": "application/json", + | "performer": { + | "user_text": "Moebeus", + | "user_groups": [ + | "*", + | "user", + | "autoconfirmed" + | ], + | "user_is_bot": false, + | "user_id": 2848912, + | "user_registration_dt": "2017-07-05T16:37:58Z", + | "user_edit_count": 2824907 + | }, + | "page_is_redirect": false, + | "comment": "/* wbeditentity-create:2|en */ Chris Botti discography, Wikimedia artist discography", + | "parsedcomment": "wbeditentity-create:2|en: Chris Botti discography, Wikimedia artist discography", + | "rev_slots": { + | "main": { + | "rev_slot_content_model": "wikibase-item", + | "rev_slot_sha1": "cvs6dh4poov6gmle643sd2yoyaimi9w", + | "rev_slot_size": 330, + | "rev_slot_origin_rev_id": 1642380248 + | } + | } + |}""" + + val decoded = decode[PageCreate](jsonString).getOrElse(None) + assert (decoded != null) + } +} -- GitLab From b45ba3696db8bf57df16278adc428c709efefe61 Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Mon, 23 May 2022 21:23:30 +0200 Subject: [PATCH 02/11] Exclude duplicate dependencies. Exclude javax.activation:activation from org.wikimedia:eventutilities and exclude io.netty:netty-transport-native-epoll from com.softwaremill.sttp.client3:async-http-client-backend-future_${scala.compat.version --- pom.xml | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index e23c31a..c3d6861 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,12 @@ com.softwaremill.sttp.client3 async-http-client-backend-future_${scala.compat.version} 3.6.2 + + + io.netty + netty-transport-native-epoll + + com.softwaremill.sttp.client3 @@ -73,6 +79,12 @@ org.wikimedia eventutilities 1.0.9 + + + javax.activation + activation + + org.wikimedia.utils @@ -303,15 +315,6 @@ org.scala-lang scala-library - - javax.activation - activation - 1.1 - - - io.netty - netty-transport-native-epoll - org.apache.flink flink-runtime -- GitLab From b28ae0ca4de66d1302e060f05545caa34952574a Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Mon, 23 May 2022 22:13:06 +0200 Subject: [PATCH 03/11] Fix scalastyle config. Remove default properties. Set failOnWarning to true. --- pom.xml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index c3d6861..69e96d6 100644 --- a/pom.xml +++ b/pom.xml @@ -339,14 +339,11 @@ scalastyle-maven-plugin 1.0.0 - false - true true - false ${project.basedir}/src/main/scala ${project.basedir}/src/test/scala ${project.basedir}/scalastyle-output.xml - UTF-8 + ${project.reporting.outputEncoding} -- GitLab From a7b2059cebb81bd4a55b82412e06131a51341d43 Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Mon, 23 May 2022 22:14:32 +0200 Subject: [PATCH 04/11] Fix or ignore scalastyle warning. Ignore violations that will likely be refactored. --- .../org/wikimedia/dataplatform/Enrichment.scala | 16 ++++++++++------ src/test/scala/ContentSuite.scala | 3 ++- src/test/scala/JsonSerDeSuite.scala | 9 ++++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index a3acf27..eb29fe6 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -1,14 +1,17 @@ package org.wikimedia.dataplatform -//import org.apache.flink.streaming.api.scala._ import io.circe.Json import io.circe.parser._ import io.circe._ import io.circe.generic.semiauto._ +import io.circe.generic.extras.auto._ +import io.circe.generic.extras.Configuration import org.slf4j.LoggerFactory import org.wikimedia.dataplatform.events.PageCreate import org.wikimedia.dataplatform.wmfapi.{ActionQuery, AsyncActionRequest} +import org.apache.flink.streaming.api.scala._ + import scala.util.{Failure, Success} import java.util.concurrent.TimeUnit @@ -33,10 +36,9 @@ object Enrichment { private val QueryTimeoutMs = 2000 private val AsyncDataStreamCapacity = 12 + // TODO(gmodena, 2022-05-22) turn back on after we are done implementing / refactoring this class. + // scalastyle:off method.length def main(args: Array[String]): Unit = { - import io.circe.generic.extras.Configuration - import io.circe.generic.extras.auto._ - import org.apache.flink.streaming.api.scala._ // TODO(gmodena, 2022-05-18): this is a helper for using Circe's decode[PageCreate](message) // We can drop it once we switch to automatic schema conversion via Table API utils. implicit val config: Configuration = Configuration.default.withDefaults @@ -71,7 +73,7 @@ object Enrichment { TimeUnit.MILLISECONDS, AsyncDataStreamCapacity).flatMap({ message: String => - Log.debug(message) + Log.info(message) parse(message) match { case Left(error) => Log.error(error.toString) @@ -84,7 +86,9 @@ object Enrichment { error.toString } error.toString // TODO(gmodena, 2022-05-19): maybe this should go in a side output? - case Success(content) => contentEncoder(content).toString + case Success(content) => + Log.info(contentEncoder(content).toString) + contentEncoder(content).toString }) contentStream.print diff --git a/src/test/scala/ContentSuite.scala b/src/test/scala/ContentSuite.scala index 8aed3b4..b4b216f 100644 --- a/src/test/scala/ContentSuite.scala +++ b/src/test/scala/ContentSuite.scala @@ -3,6 +3,7 @@ import io.circe._ import io.circe.parser._ import org.wikimedia.dataplatform.{Content, buildRevisionContentQueryUri} +// scalastyle:off line.size.limit class ContentSuite extends AnyFlatSpec { "Content" should "be extracted from a json payload" in { @@ -62,7 +63,7 @@ class ContentSuite extends AnyFlatSpec { val revisionId = 1088481788L // Query params used to generated the Content test case. - val params = "action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content%7Cids&rvslots=main&rvstartid=1088481788&rvendid=1088481788" + val params = """action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content%7Cids&rvslots=main&rvstartid=1088481788&rvendid=1088481788""" val expectedUri = s"https://$url?$params" val actionQueryUri = buildRevisionContentQueryUri(pageTitle, revisionId) diff --git a/src/test/scala/JsonSerDeSuite.scala b/src/test/scala/JsonSerDeSuite.scala index 6f1d036..1c301d4 100644 --- a/src/test/scala/JsonSerDeSuite.scala +++ b/src/test/scala/JsonSerDeSuite.scala @@ -8,6 +8,7 @@ class JsonSerDeSuite extends AnyFlatSpec { implicit val config: Configuration = Configuration.default.withDefaults "page_create events" should "be decoded into a case class" in { + // scalastyle:off line.length val jsonString = """ |{ | "$schema": "/mediawiki/revision/create/1.1.0", @@ -20,8 +21,8 @@ class JsonSerDeSuite extends AnyFlatSpec { | "stream": "mediawiki.page-create" | }, | "database": "wikidatawiki", - | "page_id": 106985263, val jsonString = \"\"\"{"$schema":"/mediawiki/revision/create/1.1.0","meta":{"uri":"https://www.wikidata.org/wiki/Q112074657","request_id":"272b1299-9c16-4e77-93dd-ff54ea63d9fe","id":"502c8a6d-7a72-4ef2-a379-932e50b24f67","dt":"2022-05-19T15:17:04Z","domain":"www.wikidata.org","stream":"mediawiki.page-create"},"database":"wikidatawiki","page_id":106985263,"page_title":"Q112074657","page_namespace":0,"rev_id":1642380248,"rev_timestamp":"2022-05-19T15:17:04Z","rev_sha1":"cvs6dh4poov6gmle643sd2yoyaimi9w","rev_minor_edit":false,"rev_len":330,"rev_content_model":"wikibase-item","rev_content_format":"application/json","performer":{"user_text":"Moebeus","user_groups":["*","user","autoconfirmed"],"user_is_bot":false,"user_id":2848912,"user_registration_dt":"2017-07-05T16:37:58Z","user_edit_count":2824907},"page_is_redirect":false,"comment":"/* wbeditentity-create:2|en */ Chris Botti discography, Wikimedia artist discography","parsedcomment":"wbeditentity-create:2|en: Chris Botti discography, Wikimedia artist discography","rev_slots":{"main":{"rev_slot_content_model":"wikibase-item","rev_slot_sha1":"cvs6dh4poov6gmle643sd2yoyaimi9w","rev_slot_size":330,"rev_slot_origin_rev_id":1642380248}}}\"\"\" - | + | "page_id": 106985263, + | val jsonString = \"\"\"{"$schema":"/mediawiki/revision/create/1.1.0", | "page_namespace": 0, | "rev_id": 1642380248, | "rev_timestamp": "2022-05-19T15:17:04Z", @@ -44,7 +45,9 @@ class JsonSerDeSuite extends AnyFlatSpec { | }, | "page_is_redirect": false, | "comment": "/* wbeditentity-create:2|en */ Chris Botti discography, Wikimedia artist discography", - | "parsedcomment": "wbeditentity-create:2|en: Chris Botti discography, Wikimedia artist discography", + | "parsedcomment": + | "wbeditentity-create:2|en: + | Chris Botti discography, Wikimedia artist discography", | "rev_slots": { | "main": { | "rev_slot_content_model": "wikibase-item", -- GitLab From 918207ab03955f7862283280108cf78e3ea7178b Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Mon, 23 May 2022 22:26:37 +0200 Subject: [PATCH 05/11] Comment capacity and timeout properites. --- .../org/wikimedia/dataplatform/Enrichment.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index eb29fe6..1d38fd6 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -32,8 +32,16 @@ class Enrichment */ object Enrichment { private val Log = LoggerFactory.getLogger(classOf[Enrichment]) + // Flink application identifier. private val JobName = "Mediawiki Stream Enrichment" - private val QueryTimeoutMs = 2000 + // Timeout (milliseconds) for an AsyncFunction to complete. + private val AsyncFunctionTimeoutMs = 2000 + // The max number of async i/o operations in progress at the same time in an AsyncDataStream instance. + // This is set to a safe value. It assumes the application is running on a default Flink config with + // - parallelism.default=1 + // - taskmanager.numberOfTaskSlots=1 + // Both can be adjusted in conf/flink.conf. + // See https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/ for more details. private val AsyncDataStreamCapacity = 12 // TODO(gmodena, 2022-05-22) turn back on after we are done implementing / refactoring this class. @@ -69,7 +77,7 @@ object Enrichment { /** 2. Query the Action API, parse valid responses, and extract content. */ val contentStream: DataStream[String] = AsyncDataStream.unorderedWait(pageCreateStream, new AsyncActionRequest(), - timeout = QueryTimeoutMs, + timeout = AsyncFunctionTimeoutMs, TimeUnit.MILLISECONDS, AsyncDataStreamCapacity).flatMap({ message: String => -- GitLab From f2125f6e1774479ec2597400a2accf2c34ba15dc Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Wed, 25 May 2022 00:30:45 +0200 Subject: [PATCH 06/11] Lower logging verbosity. --- src/main/scala/org/wikimedia/dataplatform/Enrichment.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index 1d38fd6..e292076 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -81,7 +81,7 @@ object Enrichment { TimeUnit.MILLISECONDS, AsyncDataStreamCapacity).flatMap({ message: String => - Log.info(message) + Log.debug(message) parse(message) match { case Left(error) => Log.error(error.toString) @@ -95,7 +95,6 @@ object Enrichment { } error.toString // TODO(gmodena, 2022-05-19): maybe this should go in a side output? case Success(content) => - Log.info(contentEncoder(content).toString) contentEncoder(content).toString }) -- GitLab From 4bd448148d7f35b5ca947491d38b93d16115d298 Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 31 May 2022 11:40:45 +0200 Subject: [PATCH 07/11] Remove dead code --- .../org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala index a3610a4..df353f4 100644 --- a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala +++ b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala @@ -63,7 +63,6 @@ class AsyncActionRequest extends AsyncFunction[ActionQuery, String] { request.send(backend).onComplete { case Success(response) => - //asyncHttpClient.close() response.body match { case Left(error) => Log.error(error.toString) -- GitLab From 565d7b725307299326eec3c1fdebeca08e0d6e2e Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 31 May 2022 11:44:51 +0200 Subject: [PATCH 08/11] Make backend val private --- .../org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala index df353f4..74e6b0d 100644 --- a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala +++ b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala @@ -54,7 +54,7 @@ class AsyncActionRequest extends AsyncFunction[ActionQuery, String] { * * Therefore we query Mediawiki with an async http client. */ - implicit lazy val backend = AsyncHttpClientFutureBackend() + private lazy val backend = AsyncHttpClientFutureBackend() override def asyncInvoke(input: ActionQuery, resultFuture: ResultFuture[String]): Unit = { val headersMap = input.headers.getOrElse(Map()) -- GitLab From eecd1ce13b83a97b16a64b6d7189c9461e81d60d Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 31 May 2022 11:48:30 +0200 Subject: [PATCH 09/11] Remove executor val. This declaratiopn is not needed, sicne we explicitly pass ExecutionContext.global. --- .../org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala index 74e6b0d..3eeacc4 100644 --- a/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala +++ b/src/main/scala/org/wikimedia/dataplatform/wmfapi/AsyncActionRequest.scala @@ -47,7 +47,6 @@ case class ActionQuery(uri: String, headers: Option[Map[String, String]]) /** An asynchronous Flink call back that performs an async query to the Mediawiki Action API. */ class AsyncActionRequest extends AsyncFunction[ActionQuery, String] { private val Log = LoggerFactory.getLogger(classOf[AsyncActionRequest]) - implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) /** There exists only one instance of the AsyncFunction and it is called sequentially * for each record in the respective partition of the stream. Unless the asyncInvoke(...) * method returns fast and relies on a callback (by the client), it will not result in proper asynchronous I/O. -- GitLab From d2583ad574e2cef9b046e8c84763365ca83f7e3a Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 31 May 2022 14:41:20 +0200 Subject: [PATCH 10/11] Simplify the Action API query url. Return a payload that can be mapped to a case class. --- .../org/wikimedia/dataplatform/Content.scala | 69 ----------- .../wikimedia/dataplatform/Enrichment.scala | 23 ++-- .../dataplatform/enrich/Content.scala | 58 +++++++++ .../dataplatform/enrich/Response.scala | 39 ++++++ .../org/wikimedia/dataplatform/package.scala | 12 +- src/test/scala/ContentSuite.scala | 113 +++++++++++------- 6 files changed, 180 insertions(+), 134 deletions(-) delete mode 100644 src/main/scala/org/wikimedia/dataplatform/Content.scala create mode 100644 src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala create mode 100644 src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala diff --git a/src/main/scala/org/wikimedia/dataplatform/Content.scala b/src/main/scala/org/wikimedia/dataplatform/Content.scala deleted file mode 100644 index 0dcef21..0000000 --- a/src/main/scala/org/wikimedia/dataplatform/Content.scala +++ /dev/null @@ -1,69 +0,0 @@ -package org.wikimedia.dataplatform - -import io.circe.Json -import io.circe.generic.semiauto.deriveCodec - -import scala.util.Try - -// TODO(gmodena, 2022-05,18): I've been a bit loose with this class. It's meant to -// just show some output while debugging. I uses the same field naming convention -// of the PageCreate schema. -// It will be replaced by a record that implements the output schema. -// See https://gitlab.wikimedia.org/repos/generated-data-platform/topics/mediawiki-stream-enrichment/-/merge_requests/2/diffs?view=parallel -case class Content(title: String, - article_id: Long, - contentmodel: Option[String], - contentformat: Option[String], - content: Option[String]) - -/** Parse the output of Action API's response and extract - * content data. */ -object Content { - // Dervices a Circe coded for this class for interoperability with encode/decode methods. - implicit val codec = deriveCodec[Content] - - // FIXME: scala might not like pattern matching on variables. - private def getStringField(json: Json, key: String): Option[String] = { - (json findAllByKey key).collectFirst({ - case field: Any => field.asString - case _ => None - }).flatten - } - - // FIXME: scala might not like pattern matching on variables. - private def getLongField(json: Json, key: String): Option[Long] = { - (json findAllByKey key).collectFirst({ - case field: Any => field.asNumber - case _ => None - }).flatten.flatMap(a => a.toLong) - } - - /** Parse an Action API response and extract `Content` object. - * - * @param body response body returned by `GetRevisionContent`. - * */ - def from(body: Json): Try[Content] = Try { - // FIXME(gmodena, 2022-05-20): Keys like "*" or numercial IDs in the nested arraylist break auto-mappings. - // For now we search for revision content directly on the Json object. - val revisions = (body findAllByKey "revisions") match { - case Nil => List() - case List() => List() - case revisions: List[Json] => revisions.head findAllByKey "main" // Assumes we retrieve the main slot - } - - val title = getStringField(body, "title") - val articleId = getLongField(body, "pageid") - val revisionJson = revisions.head - val contentModel = getStringField(revisionJson, "contentmodel") - val contentFormat = getStringField(revisionJson, "contentformat") - val content = getStringField(revisionJson, "*") - - new Content( - title = title.get, - article_id = articleId.get, - contentmodel = contentModel, - contentformat = contentFormat, - content = content - ) - } -} diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala index e292076..17c42f1 100644 --- a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -9,8 +9,8 @@ import io.circe.generic.extras.Configuration import org.slf4j.LoggerFactory import org.wikimedia.dataplatform.events.PageCreate import org.wikimedia.dataplatform.wmfapi.{ActionQuery, AsyncActionRequest} - import org.apache.flink.streaming.api.scala._ +import org.wikimedia.dataplatform.enrich.Content import scala.util.{Failure, Success} import java.util.concurrent.TimeUnit @@ -66,7 +66,7 @@ object Enrichment { case Right(pageCreate: PageCreate) => Option(pageCreate) } }).map((pageCreate: PageCreate) => { - val uri = buildRevisionContentQueryUri(pageCreate.page_title, pageCreate.rev_id) + val uri = buildRevisionContentQueryUri(pageCreate.rev_id) val headers = Option( Map("Host" -> pageCreate.meta.domain, "User-Agent" -> UserAgent)) @@ -82,27 +82,18 @@ object Enrichment { AsyncDataStreamCapacity).flatMap({ message: String => Log.debug(message) - parse(message) match { - case Left(error) => - Log.error(error.toString) + Content.from(message) match { + case Failure(error) => + Log.error(error.toString) // TODO(gmodena, 2022-05-19): maybe this should go in a side output? None - case Right(payload) => Option(payload) + case Success(content) => Option(content) } - }).map((payload: Json) => Content.from(payload) match { - case Failure(error) => - Log.error { - error.toString - } - error.toString // TODO(gmodena, 2022-05-19): maybe this should go in a side output? - case Success(content) => - contentEncoder(content).toString - }) + }).map((content: Content) => contentEncoder(content).toString) contentStream.print //val pageDeleteStream: DataStream[String] = env.addSource(PageDelete) //val revisionCreateStream: DataStream[String] = env.addSource(RevisionCreate) - env.execute(JobName) } } diff --git a/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala b/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala new file mode 100644 index 0000000..0e605ab --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/enrich/Content.scala @@ -0,0 +1,58 @@ +package org.wikimedia.dataplatform.enrich + +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.auto._ +import io.circe.parser._ + +import scala.util.Try + +// TODO(gmodena, 2022-05,18): I've been a bit loose with this class. It's meant to +// just show some output while debugging. I uses the same field naming convention +// of the PageCreate schema. +// It will be replaced by a record that implements the output schema. +// See https://gitlab.wikimedia.org/repos/generated-data-platform/topics/mediawiki-stream-enrichment/-/merge_requests/2/diffs?view=parallel +case class Content(title: String, + article_id: Long, + contentmodel: Option[String], + contentformat: Option[String], + content: Option[String]) + +/** Parse the output of Action API's response and extract + * content data. */ +object Content { + // Derives a Circe config for interoperability with encode/decode methods. + // E.g. provides an implicit value for evidence parameter of + // type io.circe.Decoder[ActionQueryResponse] + implicit val config: Configuration = Configuration.default.withDefaults + + private def extractContent(actionQueryResponse: ActionQueryResponse): Content = { + // TODO(gmodena, 2022-05-31) do we need to validate slots? + // Maybe we can push these checks to the Json Decoder. + if (actionQueryResponse.query.pages.isEmpty) { + throw new NoSuchElementException("Page data is missing from payload") + } + + if (actionQueryResponse.query.pages.head.revisions.isEmpty) { + throw new NoSuchElementException("Revision data is missing from payload") + } + + val page = actionQueryResponse.query.pages.head + val revisions = page.revisions.head.slots.main + + new Content( + title = page.title, + article_id = page.pageid, + contentmodel = Option(revisions.contentmodel), + contentformat = Option(revisions.contentformat), + content = Option(revisions.content)) + } + + def from(body: String): Try[Content] = Try { + decode[ActionQueryResponse](body) match { + case Left(error) => + // TODO(gmodena, 2022-05-31): this should trigger a retry. + throw new IllegalArgumentException(error) + case Right(actionQueryResponse: ActionQueryResponse) => extractContent(actionQueryResponse) + } + } +} diff --git a/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala b/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala new file mode 100644 index 0000000..46d4902 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/enrich/Response.scala @@ -0,0 +1,39 @@ +package org.wikimedia.dataplatform.enrich + +protected case class Pages( + pageid: Int, + ns: Int, + title: String, + revisions: Seq[Revisions] + ) + +protected case class Query( + pages: Seq[Pages] + ) + +protected case class Revisions( + slots: Slots + ) + +protected case class Slots( + main: Main + ) + +protected case class Main( + contentmodel: String, + contentformat: String, + content: String + ) +/** + * Encapsulate the payload of a successful action=query request, + * parametrized by the buildRevisionContentQueryUri() helper. + * + * @param batchcomplete + * @param query + */ +case class ActionQueryResponse( + batchcomplete: Boolean, + query: Query + ) + + diff --git a/src/main/scala/org/wikimedia/dataplatform/package.scala b/src/main/scala/org/wikimedia/dataplatform/package.scala index e130f5e..1ae4470 100644 --- a/src/main/scala/org/wikimedia/dataplatform/package.scala +++ b/src/main/scala/org/wikimedia/dataplatform/package.scala @@ -38,18 +38,16 @@ package object dataplatform { KafkaConfig.properties) /** An helper method query the Action API and retrive revision content - * @param pageTitle the target page title * @param revisionId the target page revision */ - def buildRevisionContentQueryUri(pageTitle: String, revisionId: Long): String = { + def buildRevisionContentQueryUri(revisionId: Long): String = { val params: Map[String, String] = ListMap( "action" -> "query", "format" -> "json", - "titles" -> pageTitle, + "formatversion" -> "2", "prop" -> "revisions", - "rvprop" -> "content|ids", - "rvslots" -> "main", - "rvstartid" -> revisionId.toString, - "rvendid" -> revisionId.toString) + "revids" -> revisionId.toString, + "rvprop" -> "content", + "rvslots" -> "main") ActionQueryBuilder().addParams(params).toString() } diff --git a/src/test/scala/ContentSuite.scala b/src/test/scala/ContentSuite.scala index b4b216f..799dab0 100644 --- a/src/test/scala/ContentSuite.scala +++ b/src/test/scala/ContentSuite.scala @@ -1,7 +1,6 @@ import org.scalatest.flatspec.AnyFlatSpec -import io.circe._ -import io.circe.parser._ -import org.wikimedia.dataplatform.{Content, buildRevisionContentQueryUri} +import org.wikimedia.dataplatform.buildRevisionContentQueryUri +import org.wikimedia.dataplatform.enrich.Content // scalastyle:off line.size.limit class ContentSuite extends AnyFlatSpec { @@ -9,39 +8,32 @@ class ContentSuite extends AnyFlatSpec { "Content" should "be extracted from a json payload" in { // generated by - // curl "https://en.wikipedia.org/w/api.php?action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content|ids&rvslots=main&rvstartid=1088481788&rvendid=1088481788" - val jsonString =""" - |{ - | "batchcomplete": "", - | "query": { - | "normalized": [ - | { - | "from": "Category:Works_and_transport_ministers_of_Namibia", - | "to": "Category:Works and transport ministers of Namibia" - | } - | ], - | "pages": { - | "70812160": { - | "pageid": 70812160, - | "ns": 14, - | "title": "Category:Works and transport ministers of Namibia", - | "revisions": [ - | { - | "revid": 1088481788, - | "parentid": 0, - | "slots": { - | "main": { - | "contentmodel": "wikitext", - | "contentformat": "text/x-wiki", - | "*": "[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]" - | } - | } - | } - | ] - | } - | } - | } - |}""".stripMargin + // curl "https://en.wikipedia.org/w/api.php?action=query&format=json&prop=revisions&revids=1088481788&formatversion=2&rvprop=content&rvslots=main" + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [ + | { + | "pageid": 70812160, + | "ns": 14, + | "title": "Category:Works and transport ministers of Namibia", + | "revisions": [ + | { + | "slots": { + | "main": { + | "contentmodel": "wikitext", + | "contentformat": "text/x-wiki", + | "content": "[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]" + | } + | } + | } + | ] + | } + | ] + | } + |}""".stripMargin val expected = Content( title="Category:Works and transport ministers of Namibia", @@ -50,23 +42,60 @@ class ContentSuite extends AnyFlatSpec { contentformat = Option("text/x-wiki"), content = Option("[[Category:Transport ministers by country|Namibia]]\n[[Category:Government ministers of Namibia]]")) - val parsed = parse(jsonString).getOrElse(Json.Null) - val content = Content.from(parsed).get + val content = Content.from(jsonString).get assert (expected === content) } + "A json payload with no pages" should "result in a parsing error" in { + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [] + | } + |}""".stripMargin + + val thrown = intercept[NoSuchElementException] { + Content.from(jsonString).get + } + assert ("Page data is missing from payload" === thrown.getMessage) + } + + "A json payload with no revisions" should "result in a parsing error" in { + val jsonString = + """ + |{ + | "batchcomplete": true, + | "query": { + | "pages": [ + | { + | "pageid": 70812160, + | "ns": 14, + | "title": "Category:Works and transport ministers of Namibia", + | "revisions": [] + | } + | ] + | } + |}""".stripMargin + + val thrown = intercept[NoSuchElementException] { + Content.from(jsonString).get + } + assert ("Revision data is missing from payload" === thrown.getMessage) + } + "A query string" should "be generated for a page and revision id" in { // By default we query api-ro.discovery. val url = "api-ro.discovery.wmnet/w/api.php" - val pageTitle = "Category:Works_and_transport_ministers_of_Namibia" val revisionId = 1088481788L - + val formatVersion = 2 // Query params used to generated the Content test case. - val params = """action=query&format=json&titles=Category:Works_and_transport_ministers_of_Namibia&prop=revisions&rvprop=content%7Cids&rvslots=main&rvstartid=1088481788&rvendid=1088481788""" + val params = s"""action=query&format=json&formatversion=${formatVersion}&prop=revisions&revids=${revisionId}&rvprop=content&rvslots=main""" val expectedUri = s"https://$url?$params" - val actionQueryUri = buildRevisionContentQueryUri(pageTitle, revisionId) + val actionQueryUri = buildRevisionContentQueryUri(revisionId) assert (expectedUri === actionQueryUri) } } -- GitLab From a7ad00e465f7382eeaf400c1963f63355644d030 Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Wed, 1 Jun 2022 22:50:35 +0200 Subject: [PATCH 11/11] Downgrade scala to 2.12.7. In this pom we are using Flink's provided Scala APIs. Only scala <= 2.12.7 is supported. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 69e96d6..0d035d6 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 4.3.0 2.0.2 3.2.12 - 2.12.15 + 2.12.7 -- GitLab