diff --git a/enrichment/pom.xml b/enrichment/pom.xml
index 3f6efe7b42b7ce63f12ea2d8975d792844e14398..35ca864c975407b965aa020dbe7294fb588e6326 100644
--- a/enrichment/pom.xml
+++ b/enrichment/pom.xml
@@ -40,19 +40,6 @@
org.apache.flink
flink-connector-kafka
-
- org.apache.flink
- flink-table-api-java-bridge
-
-
-
- org.apache.flink
- flink-table-planner-loader
-
-
- org.apache.flink
- flink-table-runtime
-
org.scala-lang
scala-library
diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala
index c2a39a05cad3cd099cd119764ebca57b1ce599b8..2b36ab02a34ce72e8441be8c9761eca227975f8c 100644
--- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala
+++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala
@@ -1,12 +1,14 @@
package org.wikimedia.mediawiki.event.enrichment
import io.findify.flink.api.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.api.java.tuple
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.types.Row
import org.slf4j.LoggerFactory
-import org.wikimedia.eventutilities.flink.EventStreamRowUtils
+import org.wikimedia.eventutilities.flink.EventRowTypeInfo
import org.wikimedia.mediawiki.event.enrichment.wmfapi.{AsyncHttpClient, Query}
-import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.language.implicitConversions
import scala.util.{Failure, Success}
@@ -15,29 +17,27 @@ import scala.util.{Failure, Success}
* logic and better error propagation
*/
class AsyncEnrichWithContent(val actionApiEndpoint: String,
- val eventStreamRowUtils: EventStreamRowUtils)
- extends AsyncFunction[Row, Either[String, Row]] {
+ val rowTypeInfo: EventRowTypeInfo)
+ extends AsyncFunction[Row, tuple.Tuple2[String, Row]] {
private val Log = LoggerFactory.getLogger(classOf[AsyncEnrichWithContent])
override def asyncInvoke(
event: Row,
- resultFuture: ResultFuture[Either[String, Row]]
+ resultFuture: ResultFuture[tuple.Tuple2[String, Row]]
): Unit = {
val change = event.getField("change_type")
if (change == ChangeType.PageDelete) {
// Delete events do not create a new revision payload. There's not content to retrieve.
- try {
- val enrichedEvent = enrichWithoutContent(event)
- resultFuture.complete(Iterable(Right(enrichedEvent)))
- } catch {
- case ex: Exception => resultFuture.complete(Iterable(Left(ex.toString)))
- }
-
+ collectEvents(Right(enrichWithoutContent(event)), resultFuture)
} else {
actionApiCallback(event, resultFuture)
}
}
+ def outputTypeInfo: TupleTypeInfo[tuple.Tuple2[String, Row]] = {
+ new TupleTypeInfo[tuple.Tuple2[String, Row]](Types.STRING, rowTypeInfo)
+ }
+
// TODO move this out
private def makeHttpClientForActionApi(event: Row): Query = {
val domain: String = event.getFieldAs[Row]("meta").getFieldAs("domain")
@@ -49,9 +49,7 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String,
}
private def enrichWithoutContent(event: Row) = {
- val enrichedEvent = eventStreamRowUtils.projectFrom(event, true)
- eventStreamRowUtils.setIngestionTime(enrichedEvent, Instant.now)
- enrichedEvent
+ rowTypeInfo.projectFrom(event, true)
}
/** Send an async request to the Action API and return an enriched event.
@@ -63,25 +61,35 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String,
*/
private def actionApiCallback(
event: Row,
- resultFuture: ResultFuture[Either[String, Row]]
+ resultFuture: ResultFuture[tuple.Tuple2[String, Row]]
): Unit = {
AsyncHttpClient
.send(makeHttpClientForActionApi(event))
.map(response => {
response.body match {
case Right(message) =>
- enrichWithContent(event, message) match {
- case Right(enriched) =>
- resultFuture.complete(Iterable(Right(enriched)))
- case Left(error) =>
- resultFuture.complete(Iterable(Left(error)))
- }
+ collectEvents(enrichWithContent(event, message), resultFuture)
case Left(error) =>
- resultFuture.complete(Iterable(Left(error)))
+ collectEvents(Left(error), resultFuture)
}
})(ExecutionContext.global)
}
+ private def collectEvents(
+ response: Either[String, Row],
+ resultFuture: ResultFuture[tuple.Tuple2[String, Row]]
+ ): Unit = {
+ val transformed = new tuple.Tuple2[String, Row];
+ response match {
+ case Right(message) =>
+ transformed.f1 = message;
+ case Left(error) =>
+ transformed.f0 = error
+ transformed.f1 = rowTypeInfo.createEmptyRow()
+ }
+ resultFuture.complete(Iterable(transformed))
+ }
+
/** Enrich an event with content retrieved from an Action API query.
*
* @param event
@@ -101,8 +109,8 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String,
Left(errorMsg)
case Success(content) =>
// TODO: this is just a stub for testing purposes. Replace when the output schema if finialised https://phabricator.wikimedia.org/T311600
- val enrichedEvent = eventStreamRowUtils.projectFrom(event, true)
- enrichedEvent.setField("content", content.content)
+ val enrichedEvent = rowTypeInfo.projectFrom(event, true)
+ enrichedEvent.setField("content", content.content.orNull)
Right(enrichedEvent)
}
}
diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala
index b0302870b1dee582500fdf6092aa5cb4a6434cb3..c5bd7098d4f8f36a9bf06c70e2cf9299af255aad 100644
--- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala
+++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala
@@ -2,15 +2,15 @@ package org.wikimedia.mediawiki.event.enrichment
import io.findify.flink.api._
import org.apache.flink.api.common.eventtime.WatermarkStrategy
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.sink2.Sink
+import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.connector.kafka.sink.KafkaSink
import org.apache.flink.connector.kafka.source.KafkaSource
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
import org.slf4j.LoggerFactory
-import org.wikimedia.eventutilities.flink.EventStreamRowUtils
+import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy
import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory
import org.wikimedia.mediawiki.event.enrichment.wmfapi.Query
@@ -45,8 +45,6 @@ object Enrichment {
lazy val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
- lazy val tableEnv: StreamTableEnvironment =
- StreamTableEnvironment.create(env.getJavaEnv)
/** Returns a DataStream[Row] of the event stream in Kafka Cluster configured KafkaConfig.
*/
@@ -65,7 +63,7 @@ object Enrichment {
eventSource,
WatermarkStrategy.noWatermarks(),
PageChangeSourceName
- )(TypeInformation.of(classOf[Row]))
+ )(eventSource.getProducedType)
}
// scalastyle:off null
@@ -79,8 +77,7 @@ object Enrichment {
schemaVersion,
KafkaConfig.BootstrapServers,
PageEnrichedTopic,
- null,
- null
+ KafkaRecordTimestampStrategy.ROW_EVENT_TIME
)
.build()
@@ -118,17 +115,10 @@ object Enrichment {
* that do not carry revision content, simplfy push the event forward and enrich without any api call.
*/
val enrichedStream = enrichPageChange(eventStream = source)
-
+ enrichedStream.sinkTo(sink)
enrichedStream
- .flatMap((f: Either[String, Row]) => {
- f match {
- case Left(error) =>
- Log.error(error)
- None
- case Right(row) => Option(row)
- }
- })(TypeInformation.of(classOf[Row]))
- .sinkTo(sink)
+ .getSideOutput(ErrorSideOutputTag)(ErrorSideOutputTag.getTypeInfo)
+ .addSink(LoggerFactory.getLogger("enrichment-errors").error(_))
}
private def filterEventsBySize(source: DataStream[Row]): DataStream[Row] = {
@@ -162,23 +152,37 @@ object Enrichment {
def enrichPageChange(eventStream: DataStream[Row])(
implicit actionApiEndpoint: String,
eventStreamFactory: EventDataStreamFactory
- ): DataStream[Either[String, Row]] = {
+ ): DataStream[Row] = {
val typeInfo =
eventStreamFactory.rowTypeInfo(
PageEnrichedTopic,
PageEnrichedTopicVersion
)
- val eventStreamRowUtils = new EventStreamRowUtils(typeInfo)
- AsyncDataStream.unorderedWait(
+ val asyncOp = new AsyncEnrichWithContent(
+ actionApiEndpoint = actionApiEndpoint,
+ rowTypeInfo = typeInfo
+ )
+
+ val stream = AsyncDataStream.unorderedWait(
filterEventsBySize(eventStream),
- new AsyncEnrichWithContent(
- actionApiEndpoint = actionApiEndpoint,
- eventStreamRowUtils = eventStreamRowUtils
- ),
+ asyncOp,
timeout = AsyncFunctionTimeoutMs,
TimeUnit.MILLISECONDS,
AsyncDataStreamCapacity
- )(TypeInformation.of(classOf[Either[String, Row]]))
+ )(asyncOp.outputTypeInfo)
+
+ // Route errors to a side output
+ stream.process(
+ (event: Tuple2[String, Row],
+ context: ProcessFunction[Tuple2[String, Row], Row]#Context,
+ collector: Collector[Row]) => {
+ (Option(event.f0), Option(event.f1)) match {
+ case (Some(error), _) =>
+ context.output(ErrorSideOutputTag, error)
+ case (None, Some(row)) => collector.collect(row)
+ }
+ }
+ )(typeInfo)
}
}
diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala
index 5cb15e6e4894fba605e2ce9cbaf2ed9d24a69fc9..2082325bdcec27e369d9d5cf9860b2a459acd8aa 100644
--- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala
+++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala
@@ -1,5 +1,7 @@
package org.wikimedia.mediawiki.event
+import io.findify.flink.api.OutputTag
+import org.apache.flink.api.common.typeinfo.Types
import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory
import org.wikimedia.mediawiki.event.enrichment.wmfapi.QueryBuilder
@@ -30,6 +32,7 @@ package object enrichment {
val UserAgent = "wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot"
val PageChangeTopicPrefix = s"eqiad.mediawiki"
val RevisionCreateWithContentPrefix = "mediawiki"
+ val ErrorSideOutputTag = new OutputTag[String]("errors")(Types.STRING)
/** An helper method query the Action API and retrive revision content
*
diff --git a/enrichment/src/test/scala/EnrichmentSuite.scala b/enrichment/src/test/scala/EnrichmentSuite.scala
index 2f9bfaf72735be641e893dacf1703147780095ec..a8664c9c1c5944b96e7da7eb978fe951681ace82 100644
--- a/enrichment/src/test/scala/EnrichmentSuite.scala
+++ b/enrichment/src/test/scala/EnrichmentSuite.scala
@@ -4,26 +4,24 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration.options
import com.google.common.io.Resources
import io.findify.flink.api._
import io.findify.flinkadt.api._
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.types.Row
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
-import org.wikimedia.eventutilities.core.event.JsonEventGenerator
import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory
-import org.wikimedia.mediawiki.event.enrichment.Enrichment.{
- enrichPageChange,
- makePipeline
-}
+import org.wikimedia.mediawiki.event.enrichment.Enrichment.enrichPageChange
import org.wikimedia.mediawiki.event.enrichment.{
ChangeType,
+ ErrorSideOutputTag,
PageChangeTopic,
PageChangeTopicVersion
}
import java.nio.charset.StandardCharsets
+import scala.collection.mutable.ListBuffer
import scala.io.Source
import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
@@ -145,6 +143,7 @@ class EnrichmentIntegrationTest
|}
|""".stripMargin
before {
+ ErrorSink.errors.clear()
flinkCluster.before()
env.setParallelism(1)
@@ -210,38 +209,32 @@ class EnrichmentIntegrationTest
.map(
(jsonString: String) =>
deserializer.deserialize(jsonString.getBytes(StandardCharsets.UTF_8))
- )(TypeInformation.of(classOf[Row]))
+ )(deserializer.getProducedType)
val contentStream =
enrichPageChange(dataStream)(localEndpointUrl, eventStreamFactory)
+ contentStream.getSideOutput(ErrorSideOutputTag).addSink(ErrorSink)
val enriched = contentStream.executeAndCollect()
- assert(enriched.length == 4) // we expect 3 successes and one error
-
- JsonEventGenerator.builder()
+ // we expect 3 successes and one error
+ assert(enriched.length == 3)
+ assert(ErrorSink.errors.map(Option(_)).count(_.isDefined) == 1)
- enriched.foreach((result: Either[String, Row]) => {
- result match {
- case Right(e) => {
- val content: Option[String] = e.getFieldAs("content")
- val revId = e.getField("rev_id")
- val changeType = e.getField("change_type")
+ enriched.foreach(e => {
+ val content: Option[String] = Option(e.getFieldAs("content"))
+ val revId = e.getField("rev_id")
+ val changeType = e.getField("change_type")
- assert(revId != revisionCreateMissingRevId)
- if (revId === pageCreateRevId) {
- assert(changeType === ChangeType.PageCreate)
- assert(!content.isEmpty)
- } else if (revId === revisionCreateRevId) {
- assert(changeType === ChangeType.RevisionCreate)
- assert(!content.isEmpty)
- } else if (revId === pageDeleteRevId) {
- assert(changeType === ChangeType.PageDelete)
- }
- }
- case Left(error) =>
- assert(!error.isEmpty)
- None
+ assert(revId != revisionCreateMissingRevId)
+ if (revId === pageCreateRevId) {
+ assert(changeType === ChangeType.PageCreate)
+ assert(content.isDefined)
+ } else if (revId === revisionCreateRevId) {
+ assert(changeType === ChangeType.RevisionCreate)
+ assert(content.isDefined)
+ } else if (revId === pageDeleteRevId) {
+ assert(changeType === ChangeType.PageDelete)
}
})
}
@@ -262,3 +255,14 @@ class EnrichmentIntegrationTest
) // Triggered by one call to executeAndCollect() from the global env
}
}
+
+/**
+ * Static object to collect errors.
+ * It must be static as those are collected from another thread and would be serialized to the task executor thread.
+ */
+object ErrorSink extends SinkFunction[String] {
+ val errors: ListBuffer[String] = new ListBuffer
+
+ override def invoke(value: String, context: SinkFunction.Context): Unit =
+ errors.append(value)
+}
diff --git a/pom.xml b/pom.xml
index 764e16934cd9e3281cae73f11d7e3bbe95f3709e..6d2d422b6911dc6f7c9b40a79da5dd631f60acc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.wikimedia.discovery
discovery-parent-pom
- 1.56
+ 1.57
org.wikimedia.mediawiki.event
mediawiki-event-enrichment
@@ -118,20 +118,10 @@
flink-runtime
${flink.version}
+
org.apache.flink
- flink-table-api-java-bridge
- ${flink.version}
-
-
-
- org.apache.flink
- flink-table-planner-loader
- ${flink.version}
-
-
- org.apache.flink
- flink-table-runtime
+ flink-table-api-java
${flink.version}
@@ -158,13 +148,18 @@
org.wikimedia
eventutilities-flink
- 1.2.0-SNAPSHOT
-
-
- org.apache.flink
- flink-table-api-java
- ${flink.version}
- provided
+ 1.2.0
+
+
+ javax.activation
+ activation
+
+
+
+ com.github.java-json-tool
+ json-schema-core
+
+
@@ -274,11 +269,19 @@
org.apache.flink
- flink-table-runtime
+ flink-connector-kafka
- org.apache.flink
- flink-connector-kafka
+ com.github.java-json-tools
+ json-schema-core
+
+
+
+ org.apache.httpcomponents.client5
+ httpclient5