diff --git a/enrichment/pom.xml b/enrichment/pom.xml index 3f6efe7b42b7ce63f12ea2d8975d792844e14398..35ca864c975407b965aa020dbe7294fb588e6326 100644 --- a/enrichment/pom.xml +++ b/enrichment/pom.xml @@ -40,19 +40,6 @@ org.apache.flink flink-connector-kafka - - org.apache.flink - flink-table-api-java-bridge - - - - org.apache.flink - flink-table-planner-loader - - - org.apache.flink - flink-table-runtime - org.scala-lang scala-library diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala index c2a39a05cad3cd099cd119764ebca57b1ce599b8..2b36ab02a34ce72e8441be8c9761eca227975f8c 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala @@ -1,12 +1,14 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api.async.{AsyncFunction, ResultFuture} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.api.java.tuple +import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.types.Row import org.slf4j.LoggerFactory -import org.wikimedia.eventutilities.flink.EventStreamRowUtils +import org.wikimedia.eventutilities.flink.EventRowTypeInfo import org.wikimedia.mediawiki.event.enrichment.wmfapi.{AsyncHttpClient, Query} -import java.time.Instant import scala.concurrent.ExecutionContext import scala.language.implicitConversions import scala.util.{Failure, Success} @@ -15,29 +17,27 @@ import scala.util.{Failure, Success} * logic and better error propagation */ class AsyncEnrichWithContent(val actionApiEndpoint: String, - val eventStreamRowUtils: EventStreamRowUtils) - extends AsyncFunction[Row, Either[String, Row]] { + val rowTypeInfo: EventRowTypeInfo) + extends AsyncFunction[Row, tuple.Tuple2[String, Row]] { private val Log = LoggerFactory.getLogger(classOf[AsyncEnrichWithContent]) override def asyncInvoke( event: Row, - resultFuture: ResultFuture[Either[String, Row]] + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] ): Unit = { val change = event.getField("change_type") if (change == ChangeType.PageDelete) { // Delete events do not create a new revision payload. There's not content to retrieve. - try { - val enrichedEvent = enrichWithoutContent(event) - resultFuture.complete(Iterable(Right(enrichedEvent))) - } catch { - case ex: Exception => resultFuture.complete(Iterable(Left(ex.toString))) - } - + collectEvents(Right(enrichWithoutContent(event)), resultFuture) } else { actionApiCallback(event, resultFuture) } } + def outputTypeInfo: TupleTypeInfo[tuple.Tuple2[String, Row]] = { + new TupleTypeInfo[tuple.Tuple2[String, Row]](Types.STRING, rowTypeInfo) + } + // TODO move this out private def makeHttpClientForActionApi(event: Row): Query = { val domain: String = event.getFieldAs[Row]("meta").getFieldAs("domain") @@ -49,9 +49,7 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, } private def enrichWithoutContent(event: Row) = { - val enrichedEvent = eventStreamRowUtils.projectFrom(event, true) - eventStreamRowUtils.setIngestionTime(enrichedEvent, Instant.now) - enrichedEvent + rowTypeInfo.projectFrom(event, true) } /** Send an async request to the Action API and return an enriched event. @@ -63,25 +61,35 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, */ private def actionApiCallback( event: Row, - resultFuture: ResultFuture[Either[String, Row]] + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] ): Unit = { AsyncHttpClient .send(makeHttpClientForActionApi(event)) .map(response => { response.body match { case Right(message) => - enrichWithContent(event, message) match { - case Right(enriched) => - resultFuture.complete(Iterable(Right(enriched))) - case Left(error) => - resultFuture.complete(Iterable(Left(error))) - } + collectEvents(enrichWithContent(event, message), resultFuture) case Left(error) => - resultFuture.complete(Iterable(Left(error))) + collectEvents(Left(error), resultFuture) } })(ExecutionContext.global) } + private def collectEvents( + response: Either[String, Row], + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] + ): Unit = { + val transformed = new tuple.Tuple2[String, Row]; + response match { + case Right(message) => + transformed.f1 = message; + case Left(error) => + transformed.f0 = error + transformed.f1 = rowTypeInfo.createEmptyRow() + } + resultFuture.complete(Iterable(transformed)) + } + /** Enrich an event with content retrieved from an Action API query. * * @param event @@ -101,8 +109,8 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, Left(errorMsg) case Success(content) => // TODO: this is just a stub for testing purposes. Replace when the output schema if finialised https://phabricator.wikimedia.org/T311600 - val enrichedEvent = eventStreamRowUtils.projectFrom(event, true) - enrichedEvent.setField("content", content.content) + val enrichedEvent = rowTypeInfo.projectFrom(event, true) + enrichedEvent.setField("content", content.content.orNull) Right(enrichedEvent) } } diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala index b0302870b1dee582500fdf6092aa5cb4a6434cb3..c5bd7098d4f8f36a9bf06c70e2cf9299af255aad 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala @@ -2,15 +2,15 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api._ import org.apache.flink.api.common.eventtime.WatermarkStrategy -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.connector.sink2.Sink +import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.connector.kafka.sink.KafkaSink import org.apache.flink.connector.kafka.source.KafkaSource -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory -import org.wikimedia.eventutilities.flink.EventStreamRowUtils +import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.Query @@ -45,8 +45,6 @@ object Enrichment { lazy val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - lazy val tableEnv: StreamTableEnvironment = - StreamTableEnvironment.create(env.getJavaEnv) /** Returns a DataStream[Row] of the event stream in Kafka Cluster configured KafkaConfig. */ @@ -65,7 +63,7 @@ object Enrichment { eventSource, WatermarkStrategy.noWatermarks(), PageChangeSourceName - )(TypeInformation.of(classOf[Row])) + )(eventSource.getProducedType) } // scalastyle:off null @@ -79,8 +77,7 @@ object Enrichment { schemaVersion, KafkaConfig.BootstrapServers, PageEnrichedTopic, - null, - null + KafkaRecordTimestampStrategy.ROW_EVENT_TIME ) .build() @@ -118,17 +115,10 @@ object Enrichment { * that do not carry revision content, simplfy push the event forward and enrich without any api call. */ val enrichedStream = enrichPageChange(eventStream = source) - + enrichedStream.sinkTo(sink) enrichedStream - .flatMap((f: Either[String, Row]) => { - f match { - case Left(error) => - Log.error(error) - None - case Right(row) => Option(row) - } - })(TypeInformation.of(classOf[Row])) - .sinkTo(sink) + .getSideOutput(ErrorSideOutputTag)(ErrorSideOutputTag.getTypeInfo) + .addSink(LoggerFactory.getLogger("enrichment-errors").error(_)) } private def filterEventsBySize(source: DataStream[Row]): DataStream[Row] = { @@ -162,23 +152,37 @@ object Enrichment { def enrichPageChange(eventStream: DataStream[Row])( implicit actionApiEndpoint: String, eventStreamFactory: EventDataStreamFactory - ): DataStream[Either[String, Row]] = { + ): DataStream[Row] = { val typeInfo = eventStreamFactory.rowTypeInfo( PageEnrichedTopic, PageEnrichedTopicVersion ) - val eventStreamRowUtils = new EventStreamRowUtils(typeInfo) - AsyncDataStream.unorderedWait( + val asyncOp = new AsyncEnrichWithContent( + actionApiEndpoint = actionApiEndpoint, + rowTypeInfo = typeInfo + ) + + val stream = AsyncDataStream.unorderedWait( filterEventsBySize(eventStream), - new AsyncEnrichWithContent( - actionApiEndpoint = actionApiEndpoint, - eventStreamRowUtils = eventStreamRowUtils - ), + asyncOp, timeout = AsyncFunctionTimeoutMs, TimeUnit.MILLISECONDS, AsyncDataStreamCapacity - )(TypeInformation.of(classOf[Either[String, Row]])) + )(asyncOp.outputTypeInfo) + + // Route errors to a side output + stream.process( + (event: Tuple2[String, Row], + context: ProcessFunction[Tuple2[String, Row], Row]#Context, + collector: Collector[Row]) => { + (Option(event.f0), Option(event.f1)) match { + case (Some(error), _) => + context.output(ErrorSideOutputTag, error) + case (None, Some(row)) => collector.collect(row) + } + } + )(typeInfo) } } diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala index 5cb15e6e4894fba605e2ce9cbaf2ed9d24a69fc9..2082325bdcec27e369d9d5cf9860b2a459acd8aa 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala @@ -1,5 +1,7 @@ package org.wikimedia.mediawiki.event +import io.findify.flink.api.OutputTag +import org.apache.flink.api.common.typeinfo.Types import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.QueryBuilder @@ -30,6 +32,7 @@ package object enrichment { val UserAgent = "wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot" val PageChangeTopicPrefix = s"eqiad.mediawiki" val RevisionCreateWithContentPrefix = "mediawiki" + val ErrorSideOutputTag = new OutputTag[String]("errors")(Types.STRING) /** An helper method query the Action API and retrive revision content * diff --git a/enrichment/src/test/scala/EnrichmentSuite.scala b/enrichment/src/test/scala/EnrichmentSuite.scala index 2f9bfaf72735be641e893dacf1703147780095ec..a8664c9c1c5944b96e7da7eb978fe951681ace82 100644 --- a/enrichment/src/test/scala/EnrichmentSuite.scala +++ b/enrichment/src/test/scala/EnrichmentSuite.scala @@ -4,26 +4,24 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration.options import com.google.common.io.Resources import io.findify.flink.api._ import io.findify.flinkadt.api._ -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.flink.types.Row import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.wikimedia.eventutilities.core.event.JsonEventGenerator import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory -import org.wikimedia.mediawiki.event.enrichment.Enrichment.{ - enrichPageChange, - makePipeline -} +import org.wikimedia.mediawiki.event.enrichment.Enrichment.enrichPageChange import org.wikimedia.mediawiki.event.enrichment.{ ChangeType, + ErrorSideOutputTag, PageChangeTopic, PageChangeTopicVersion } import java.nio.charset.StandardCharsets +import scala.collection.mutable.ListBuffer import scala.io.Source import scala.jdk.CollectionConverters._ import scala.language.implicitConversions @@ -145,6 +143,7 @@ class EnrichmentIntegrationTest |} |""".stripMargin before { + ErrorSink.errors.clear() flinkCluster.before() env.setParallelism(1) @@ -210,38 +209,32 @@ class EnrichmentIntegrationTest .map( (jsonString: String) => deserializer.deserialize(jsonString.getBytes(StandardCharsets.UTF_8)) - )(TypeInformation.of(classOf[Row])) + )(deserializer.getProducedType) val contentStream = enrichPageChange(dataStream)(localEndpointUrl, eventStreamFactory) + contentStream.getSideOutput(ErrorSideOutputTag).addSink(ErrorSink) val enriched = contentStream.executeAndCollect() - assert(enriched.length == 4) // we expect 3 successes and one error - - JsonEventGenerator.builder() + // we expect 3 successes and one error + assert(enriched.length == 3) + assert(ErrorSink.errors.map(Option(_)).count(_.isDefined) == 1) - enriched.foreach((result: Either[String, Row]) => { - result match { - case Right(e) => { - val content: Option[String] = e.getFieldAs("content") - val revId = e.getField("rev_id") - val changeType = e.getField("change_type") + enriched.foreach(e => { + val content: Option[String] = Option(e.getFieldAs("content")) + val revId = e.getField("rev_id") + val changeType = e.getField("change_type") - assert(revId != revisionCreateMissingRevId) - if (revId === pageCreateRevId) { - assert(changeType === ChangeType.PageCreate) - assert(!content.isEmpty) - } else if (revId === revisionCreateRevId) { - assert(changeType === ChangeType.RevisionCreate) - assert(!content.isEmpty) - } else if (revId === pageDeleteRevId) { - assert(changeType === ChangeType.PageDelete) - } - } - case Left(error) => - assert(!error.isEmpty) - None + assert(revId != revisionCreateMissingRevId) + if (revId === pageCreateRevId) { + assert(changeType === ChangeType.PageCreate) + assert(content.isDefined) + } else if (revId === revisionCreateRevId) { + assert(changeType === ChangeType.RevisionCreate) + assert(content.isDefined) + } else if (revId === pageDeleteRevId) { + assert(changeType === ChangeType.PageDelete) } }) } @@ -262,3 +255,14 @@ class EnrichmentIntegrationTest ) // Triggered by one call to executeAndCollect() from the global env } } + +/** + * Static object to collect errors. + * It must be static as those are collected from another thread and would be serialized to the task executor thread. + */ +object ErrorSink extends SinkFunction[String] { + val errors: ListBuffer[String] = new ListBuffer + + override def invoke(value: String, context: SinkFunction.Context): Unit = + errors.append(value) +} diff --git a/pom.xml b/pom.xml index 764e16934cd9e3281cae73f11d7e3bbe95f3709e..6d2d422b6911dc6f7c9b40a79da5dd631f60acc0 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.wikimedia.discovery discovery-parent-pom - 1.56 + 1.57 org.wikimedia.mediawiki.event mediawiki-event-enrichment @@ -118,20 +118,10 @@ flink-runtime ${flink.version} + org.apache.flink - flink-table-api-java-bridge - ${flink.version} - - - - org.apache.flink - flink-table-planner-loader - ${flink.version} - - - org.apache.flink - flink-table-runtime + flink-table-api-java ${flink.version} @@ -158,13 +148,18 @@ org.wikimedia eventutilities-flink - 1.2.0-SNAPSHOT - - - org.apache.flink - flink-table-api-java - ${flink.version} - provided + 1.2.0 + + + javax.activation + activation + + + + com.github.java-json-tool + json-schema-core + + @@ -274,11 +269,19 @@ org.apache.flink - flink-table-runtime + flink-connector-kafka - org.apache.flink - flink-connector-kafka + com.github.java-json-tools + json-schema-core + + + + org.apache.httpcomponents.client5 + httpclient5