From fd624bba30853f978739ab78591150cd13ffebd0 Mon Sep 17 00:00:00 2001 From: David Causse Date: Tue, 26 Jul 2022 12:21:09 +0200 Subject: [PATCH 1/4] Better use of RowTypeInfo Use the corresponding RowTypeInfo of the Row object being manipulated to ensure some type validation during flink serialization between operators. Errors are routed to a side output. Made some dependency cleanups. --- enrichment/pom.xml | 13 ---- .../enrichment/AsyncEnrichWithContent.scala | 57 ++++++++------- .../event/enrichment/Enrichment.scala | 55 ++++++++------- .../mediawiki/event/enrichment/package.scala | 6 +- .../src/test/scala/EnrichmentSuite.scala | 69 ++++++++++--------- pom.xml | 47 +++++++------ 6 files changed, 125 insertions(+), 122 deletions(-) diff --git a/enrichment/pom.xml b/enrichment/pom.xml index 3f6efe7..35ca864 100644 --- a/enrichment/pom.xml +++ b/enrichment/pom.xml @@ -40,19 +40,6 @@ org.apache.flink flink-connector-kafka - - org.apache.flink - flink-table-api-java-bridge - - - - org.apache.flink - flink-table-planner-loader - - - org.apache.flink - flink-table-runtime - org.scala-lang scala-library diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala index c2a39a0..4c88fdf 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala @@ -3,41 +3,43 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api.async.{AsyncFunction, ResultFuture} import org.apache.flink.types.Row import org.slf4j.LoggerFactory -import org.wikimedia.eventutilities.flink.EventStreamRowUtils +import org.wikimedia.eventutilities.flink.{EventRowTypeInfo, EventStreamRowUtils} import org.wikimedia.mediawiki.event.enrichment.wmfapi.{AsyncHttpClient, Query} - import java.time.Instant + import scala.concurrent.ExecutionContext import scala.language.implicitConversions import scala.util.{Failure, Success} +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.tuple +import org.apache.flink.api.java.typeutils.TupleTypeInfo + /** An asynchronous Flink call back that performs an async query to the Mediawiki Action API. TODO(gmodena, 2022-06-31): implement retry * logic and better error propagation */ class AsyncEnrichWithContent(val actionApiEndpoint: String, - val eventStreamRowUtils: EventStreamRowUtils) - extends AsyncFunction[Row, Either[String, Row]] { + val rowTypeInfo: EventRowTypeInfo) + extends AsyncFunction[Row, tuple.Tuple2[String, Row]] { private val Log = LoggerFactory.getLogger(classOf[AsyncEnrichWithContent]) override def asyncInvoke( event: Row, - resultFuture: ResultFuture[Either[String, Row]] + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] ): Unit = { val change = event.getField("change_type") if (change == ChangeType.PageDelete) { // Delete events do not create a new revision payload. There's not content to retrieve. - try { - val enrichedEvent = enrichWithoutContent(event) - resultFuture.complete(Iterable(Right(enrichedEvent))) - } catch { - case ex: Exception => resultFuture.complete(Iterable(Left(ex.toString))) - } - + collectEvents(Right(enrichWithoutContent(event)), resultFuture) } else { actionApiCallback(event, resultFuture) } } + def outputTypeInfo: TupleTypeInfo[tuple.Tuple2[String, Row]] = { + new TupleTypeInfo[tuple.Tuple2[String, Row]](Types.STRING, rowTypeInfo) + } + // TODO move this out private def makeHttpClientForActionApi(event: Row): Query = { val domain: String = event.getFieldAs[Row]("meta").getFieldAs("domain") @@ -49,9 +51,7 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, } private def enrichWithoutContent(event: Row) = { - val enrichedEvent = eventStreamRowUtils.projectFrom(event, true) - eventStreamRowUtils.setIngestionTime(enrichedEvent, Instant.now) - enrichedEvent + rowTypeInfo.projectFrom(event, true) } /** Send an async request to the Action API and return an enriched event. @@ -63,25 +63,32 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, */ private def actionApiCallback( event: Row, - resultFuture: ResultFuture[Either[String, Row]] + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] ): Unit = { AsyncHttpClient .send(makeHttpClientForActionApi(event)) .map(response => { response.body match { case Right(message) => - enrichWithContent(event, message) match { - case Right(enriched) => - resultFuture.complete(Iterable(Right(enriched))) - case Left(error) => - resultFuture.complete(Iterable(Left(error))) - } + collectEvents(enrichWithContent(event, message), resultFuture) case Left(error) => - resultFuture.complete(Iterable(Left(error))) + collectEvents(Left(error), resultFuture) } })(ExecutionContext.global) } + private def collectEvents(response: Either[String, Row], resultFuture: ResultFuture[tuple.Tuple2[String, Row]]): Unit = { + val transformed = new tuple.Tuple2[String, Row]; + response match { + case Right(message) => + transformed.f1 = message; + case Left(error) => + transformed.f0 = error + transformed.f1 = rowTypeInfo.createEmptyRow() + } + resultFuture.complete(Iterable(transformed)) + } + /** Enrich an event with content retrieved from an Action API query. * * @param event @@ -101,8 +108,8 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, Left(errorMsg) case Success(content) => // TODO: this is just a stub for testing purposes. Replace when the output schema if finialised https://phabricator.wikimedia.org/T311600 - val enrichedEvent = eventStreamRowUtils.projectFrom(event, true) - enrichedEvent.setField("content", content.content) + val enrichedEvent = rowTypeInfo.projectFrom(event, true) + enrichedEvent.setField("content", content.content.orNull) Right(enrichedEvent) } } diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala index b030287..b0c8dda 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala @@ -2,20 +2,21 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api._ import org.apache.flink.api.common.eventtime.WatermarkStrategy -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.connector.sink2.Sink import org.apache.flink.connector.kafka.sink.KafkaSink import org.apache.flink.connector.kafka.source.KafkaSource -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.types.Row import org.slf4j.LoggerFactory -import org.wikimedia.eventutilities.flink.EventStreamRowUtils import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.Query import java.util.concurrent.TimeUnit import scala.language.implicitConversions +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.Collector +import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy case class EventEnrichment[+T](endpointUrl: Query, event: T) @@ -45,8 +46,6 @@ object Enrichment { lazy val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - lazy val tableEnv: StreamTableEnvironment = - StreamTableEnvironment.create(env.getJavaEnv) /** Returns a DataStream[Row] of the event stream in Kafka Cluster configured KafkaConfig. */ @@ -65,7 +64,7 @@ object Enrichment { eventSource, WatermarkStrategy.noWatermarks(), PageChangeSourceName - )(TypeInformation.of(classOf[Row])) + )(eventSource.getProducedType) } // scalastyle:off null @@ -79,8 +78,7 @@ object Enrichment { schemaVersion, KafkaConfig.BootstrapServers, PageEnrichedTopic, - null, - null + KafkaRecordTimestampStrategy.ROW_EVENT_TIME ) .build() @@ -118,17 +116,9 @@ object Enrichment { * that do not carry revision content, simplfy push the event forward and enrich without any api call. */ val enrichedStream = enrichPageChange(eventStream = source) - - enrichedStream - .flatMap((f: Either[String, Row]) => { - f match { - case Left(error) => - Log.error(error) - None - case Right(row) => Option(row) - } - })(TypeInformation.of(classOf[Row])) - .sinkTo(sink) + enrichedStream.sinkTo(sink) + enrichedStream.getSideOutput(ErrorSideOutputTag)(ErrorSideOutputTag.getTypeInfo) + .addSink(LoggerFactory.getLogger("enrichment-errors").error(_)) } private def filterEventsBySize(source: DataStream[Row]): DataStream[Row] = { @@ -162,23 +152,32 @@ object Enrichment { def enrichPageChange(eventStream: DataStream[Row])( implicit actionApiEndpoint: String, eventStreamFactory: EventDataStreamFactory - ): DataStream[Either[String, Row]] = { + ): DataStream[Row] = { val typeInfo = eventStreamFactory.rowTypeInfo( PageEnrichedTopic, PageEnrichedTopicVersion ) - val eventStreamRowUtils = new EventStreamRowUtils(typeInfo) - AsyncDataStream.unorderedWait( + val asyncOp = new AsyncEnrichWithContent( + actionApiEndpoint = actionApiEndpoint, + rowTypeInfo = typeInfo) + + val stream = AsyncDataStream.unorderedWait( filterEventsBySize(eventStream), - new AsyncEnrichWithContent( - actionApiEndpoint = actionApiEndpoint, - eventStreamRowUtils = eventStreamRowUtils - ), + asyncOp, timeout = AsyncFunctionTimeoutMs, TimeUnit.MILLISECONDS, AsyncDataStreamCapacity - )(TypeInformation.of(classOf[Either[String, Row]])) + )(asyncOp.outputTypeInfo) + + // Route errors to a side output + stream.process((event: Tuple2[String, Row], context: ProcessFunction[Tuple2[String, Row], Row]#Context, collector: Collector[Row]) => { + (Option(event.f0), Option(event.f1)) match { + case (Some(error), _) => + context.output(ErrorSideOutputTag, error) + case (None, Some(row)) => collector.collect(row) + } + })(typeInfo) } } diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala index 5cb15e6..63977ba 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala @@ -2,10 +2,13 @@ package org.wikimedia.mediawiki.event import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.QueryBuilder - import scala.collection.immutable.ListMap import scala.jdk.CollectionConverters._ +import io.findify.flink.api.OutputTag +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.streaming.api.operators.Output + /** This package object contains a few utilities that don't have a well defined place yet. */ //noinspection ScalaDeprecation package object enrichment { @@ -30,6 +33,7 @@ package object enrichment { val UserAgent = "wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot" val PageChangeTopicPrefix = s"eqiad.mediawiki" val RevisionCreateWithContentPrefix = "mediawiki" + val ErrorSideOutputTag = new OutputTag[String]("errors")(Types.STRING) /** An helper method query the Action API and retrive revision content * diff --git a/enrichment/src/test/scala/EnrichmentSuite.scala b/enrichment/src/test/scala/EnrichmentSuite.scala index 2f9bfaf..9278e04 100644 --- a/enrichment/src/test/scala/EnrichmentSuite.scala +++ b/enrichment/src/test/scala/EnrichmentSuite.scala @@ -13,20 +13,16 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.wikimedia.eventutilities.core.event.JsonEventGenerator import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory -import org.wikimedia.mediawiki.event.enrichment.Enrichment.{ - enrichPageChange, - makePipeline -} -import org.wikimedia.mediawiki.event.enrichment.{ - ChangeType, - PageChangeTopic, - PageChangeTopicVersion -} +import org.wikimedia.mediawiki.event.enrichment.Enrichment.{enrichPageChange, makePipeline} +import org.wikimedia.mediawiki.event.enrichment.{ChangeType, ErrorSideOutputTag, PageChangeTopic, PageChangeTopicVersion} import java.nio.charset.StandardCharsets +import scala.collection.mutable.ListBuffer import scala.io.Source import scala.jdk.CollectionConverters._ import scala.language.implicitConversions +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.streaming.api.functions.sink.SinkFunction // scalastyle:off line.size.limit class EnrichmentIntegrationTest @@ -145,6 +141,7 @@ class EnrichmentIntegrationTest |} |""".stripMargin before { + ErrorSink.errors.clear() flinkCluster.before() env.setParallelism(1) @@ -210,39 +207,33 @@ class EnrichmentIntegrationTest .map( (jsonString: String) => deserializer.deserialize(jsonString.getBytes(StandardCharsets.UTF_8)) - )(TypeInformation.of(classOf[Row])) + )(deserializer.getProducedType) val contentStream = enrichPageChange(dataStream)(localEndpointUrl, eventStreamFactory) + contentStream.getSideOutput(ErrorSideOutputTag).addSink(ErrorSink) val enriched = contentStream.executeAndCollect() - assert(enriched.length == 4) // we expect 3 successes and one error + // we expect 3 successes and one error + assert(enriched.length == 3) + assert (ErrorSink.errors.map(Option(_)).count(_.isDefined) == 1) - JsonEventGenerator.builder() + enriched.foreach(e => { + val content: Option[String] = Option(e.getFieldAs("content")) + val revId = e.getField ("rev_id") + val changeType = e.getField ("change_type") - enriched.foreach((result: Either[String, Row]) => { - result match { - case Right(e) => { - val content: Option[String] = e.getFieldAs("content") - val revId = e.getField("rev_id") - val changeType = e.getField("change_type") - - assert(revId != revisionCreateMissingRevId) - if (revId === pageCreateRevId) { - assert(changeType === ChangeType.PageCreate) - assert(!content.isEmpty) - } else if (revId === revisionCreateRevId) { - assert(changeType === ChangeType.RevisionCreate) - assert(!content.isEmpty) - } else if (revId === pageDeleteRevId) { - assert(changeType === ChangeType.PageDelete) - } - } - case Left(error) => - assert(!error.isEmpty) - None - } + assert (revId != revisionCreateMissingRevId) + if (revId === pageCreateRevId) { + assert (changeType === ChangeType.PageCreate) + assert (content.isDefined) + } else if (revId === revisionCreateRevId) { + assert (changeType === ChangeType.RevisionCreate) + assert (content.isDefined) + } else if (revId === pageDeleteRevId) { + assert (changeType === ChangeType.PageDelete) + } }) } @@ -262,3 +253,13 @@ class EnrichmentIntegrationTest ) // Triggered by one call to executeAndCollect() from the global env } } + +/** + * Static object to collect errors. + * It must be static as those are collected from another thread and would be serialized to the task executor thread. + */ +object ErrorSink extends SinkFunction[String] { + val errors: ListBuffer[String] = new ListBuffer + + override def invoke(value: String, context: SinkFunction.Context): Unit = errors.append(value) +} diff --git a/pom.xml b/pom.xml index 764e169..d4da1c3 100644 --- a/pom.xml +++ b/pom.xml @@ -118,20 +118,10 @@ flink-runtime ${flink.version} + org.apache.flink - flink-table-api-java-bridge - ${flink.version} - - - - org.apache.flink - flink-table-planner-loader - ${flink.version} - - - org.apache.flink - flink-table-runtime + flink-table-api-java ${flink.version} @@ -159,12 +149,12 @@ org.wikimedia eventutilities-flink 1.2.0-SNAPSHOT - - - org.apache.flink - flink-table-api-java - ${flink.version} - provided + + + javax.activation + activation + + @@ -274,11 +264,15 @@ org.apache.flink - flink-table-runtime + flink-connector-kafka + - org.apache.flink - flink-connector-kafka + org.apache.httpcomponents.client5 + httpclient5 @@ -320,5 +314,16 @@ + + + + org.apache.maven.plugins + maven-resources-plugin + 3.1.0 + + -- GitLab From 1c684e462beb0999f37be3fe7ea1ac3f5a6ef6c4 Mon Sep 17 00:00:00 2001 From: David Causse Date: Tue, 26 Jul 2022 14:34:50 +0200 Subject: [PATCH 2/4] Bump parent pom version and drop explicit downgrade of the maven-resources-plugin --- pom.xml | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index d4da1c3..c3df78c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.wikimedia.discovery discovery-parent-pom - 1.56 + 1.57 org.wikimedia.mediawiki.event mediawiki-event-enrichment @@ -314,16 +314,5 @@ - - - - org.apache.maven.plugins - maven-resources-plugin - 3.1.0 - - -- GitLab From 830fe3566db0a630426f9afd86daab53b013c9ea Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 30 Aug 2022 13:32:28 +0200 Subject: [PATCH 3/4] Fix imports --- .../enrichment/AsyncEnrichWithContent.scala | 35 ++++++------- .../event/enrichment/Enrichment.scala | 31 ++++++----- .../mediawiki/event/enrichment/package.scala | 7 ++- .../src/test/scala/EnrichmentSuite.scala | 51 ++++++++++--------- 4 files changed, 66 insertions(+), 58 deletions(-) diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala index 4c88fdf..2b36ab0 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/AsyncEnrichWithContent.scala @@ -1,20 +1,18 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api.async.{AsyncFunction, ResultFuture} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.api.java.tuple +import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.types.Row import org.slf4j.LoggerFactory -import org.wikimedia.eventutilities.flink.{EventRowTypeInfo, EventStreamRowUtils} +import org.wikimedia.eventutilities.flink.EventRowTypeInfo import org.wikimedia.mediawiki.event.enrichment.wmfapi.{AsyncHttpClient, Query} -import java.time.Instant import scala.concurrent.ExecutionContext import scala.language.implicitConversions import scala.util.{Failure, Success} -import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.TupleTypeInfo - /** An asynchronous Flink call back that performs an async query to the Mediawiki Action API. TODO(gmodena, 2022-06-31): implement retry * logic and better error propagation */ @@ -37,7 +35,7 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, } def outputTypeInfo: TupleTypeInfo[tuple.Tuple2[String, Row]] = { - new TupleTypeInfo[tuple.Tuple2[String, Row]](Types.STRING, rowTypeInfo) + new TupleTypeInfo[tuple.Tuple2[String, Row]](Types.STRING, rowTypeInfo) } // TODO move this out @@ -77,16 +75,19 @@ class AsyncEnrichWithContent(val actionApiEndpoint: String, })(ExecutionContext.global) } - private def collectEvents(response: Either[String, Row], resultFuture: ResultFuture[tuple.Tuple2[String, Row]]): Unit = { - val transformed = new tuple.Tuple2[String, Row]; - response match { - case Right(message) => - transformed.f1 = message; - case Left(error) => - transformed.f0 = error - transformed.f1 = rowTypeInfo.createEmptyRow() - } - resultFuture.complete(Iterable(transformed)) + private def collectEvents( + response: Either[String, Row], + resultFuture: ResultFuture[tuple.Tuple2[String, Row]] + ): Unit = { + val transformed = new tuple.Tuple2[String, Row]; + response match { + case Right(message) => + transformed.f1 = message; + case Left(error) => + transformed.f0 = error + transformed.f1 = rowTypeInfo.createEmptyRow() + } + resultFuture.complete(Iterable(transformed)) } /** Enrich an event with content retrieved from an Action API query. diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala index b0c8dda..c5bd709 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/Enrichment.scala @@ -2,21 +2,20 @@ package org.wikimedia.mediawiki.event.enrichment import io.findify.flink.api._ import org.apache.flink.api.common.eventtime.WatermarkStrategy -import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.connector.sink2.Sink +import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.connector.kafka.sink.KafkaSink import org.apache.flink.connector.kafka.source.KafkaSource +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row +import org.apache.flink.util.Collector import org.slf4j.LoggerFactory +import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.Query import java.util.concurrent.TimeUnit import scala.language.implicitConversions -import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.util.Collector -import org.wikimedia.eventutilities.flink.formats.json.KafkaRecordTimestampStrategy case class EventEnrichment[+T](endpointUrl: Query, event: T) @@ -117,7 +116,8 @@ object Enrichment { */ val enrichedStream = enrichPageChange(eventStream = source) enrichedStream.sinkTo(sink) - enrichedStream.getSideOutput(ErrorSideOutputTag)(ErrorSideOutputTag.getTypeInfo) + enrichedStream + .getSideOutput(ErrorSideOutputTag)(ErrorSideOutputTag.getTypeInfo) .addSink(LoggerFactory.getLogger("enrichment-errors").error(_)) } @@ -160,8 +160,9 @@ object Enrichment { ) val asyncOp = new AsyncEnrichWithContent( - actionApiEndpoint = actionApiEndpoint, - rowTypeInfo = typeInfo) + actionApiEndpoint = actionApiEndpoint, + rowTypeInfo = typeInfo + ) val stream = AsyncDataStream.unorderedWait( filterEventsBySize(eventStream), @@ -172,12 +173,16 @@ object Enrichment { )(asyncOp.outputTypeInfo) // Route errors to a side output - stream.process((event: Tuple2[String, Row], context: ProcessFunction[Tuple2[String, Row], Row]#Context, collector: Collector[Row]) => { + stream.process( + (event: Tuple2[String, Row], + context: ProcessFunction[Tuple2[String, Row], Row]#Context, + collector: Collector[Row]) => { (Option(event.f0), Option(event.f1)) match { - case (Some(error), _) => - context.output(ErrorSideOutputTag, error) - case (None, Some(row)) => collector.collect(row) + case (Some(error), _) => + context.output(ErrorSideOutputTag, error) + case (None, Some(row)) => collector.collect(row) } - })(typeInfo) + } + )(typeInfo) } } diff --git a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala index 63977ba..2082325 100644 --- a/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala +++ b/enrichment/src/main/scala/org/wikimedia/mediawiki/event/enrichment/package.scala @@ -1,14 +1,13 @@ package org.wikimedia.mediawiki.event +import io.findify.flink.api.OutputTag +import org.apache.flink.api.common.typeinfo.Types import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory import org.wikimedia.mediawiki.event.enrichment.wmfapi.QueryBuilder + import scala.collection.immutable.ListMap import scala.jdk.CollectionConverters._ -import io.findify.flink.api.OutputTag -import org.apache.flink.api.common.typeinfo.Types -import org.apache.flink.streaming.api.operators.Output - /** This package object contains a few utilities that don't have a well defined place yet. */ //noinspection ScalaDeprecation package object enrichment { diff --git a/enrichment/src/test/scala/EnrichmentSuite.scala b/enrichment/src/test/scala/EnrichmentSuite.scala index 9278e04..a8664c9 100644 --- a/enrichment/src/test/scala/EnrichmentSuite.scala +++ b/enrichment/src/test/scala/EnrichmentSuite.scala @@ -4,25 +4,27 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration.options import com.google.common.io.Resources import io.findify.flink.api._ import io.findify.flinkadt.api._ -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.flink.types.Row import org.scalatest.BeforeAndAfter import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.wikimedia.eventutilities.core.event.JsonEventGenerator import org.wikimedia.eventutilities.flink.stream.EventDataStreamFactory -import org.wikimedia.mediawiki.event.enrichment.Enrichment.{enrichPageChange, makePipeline} -import org.wikimedia.mediawiki.event.enrichment.{ChangeType, ErrorSideOutputTag, PageChangeTopic, PageChangeTopicVersion} +import org.wikimedia.mediawiki.event.enrichment.Enrichment.enrichPageChange +import org.wikimedia.mediawiki.event.enrichment.{ + ChangeType, + ErrorSideOutputTag, + PageChangeTopic, + PageChangeTopicVersion +} import java.nio.charset.StandardCharsets import scala.collection.mutable.ListBuffer import scala.io.Source import scala.jdk.CollectionConverters._ import scala.language.implicitConversions -import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.flink.streaming.api.functions.sink.SinkFunction // scalastyle:off line.size.limit class EnrichmentIntegrationTest @@ -217,23 +219,23 @@ class EnrichmentIntegrationTest // we expect 3 successes and one error assert(enriched.length == 3) - assert (ErrorSink.errors.map(Option(_)).count(_.isDefined) == 1) + assert(ErrorSink.errors.map(Option(_)).count(_.isDefined) == 1) enriched.foreach(e => { - val content: Option[String] = Option(e.getFieldAs("content")) - val revId = e.getField ("rev_id") - val changeType = e.getField ("change_type") + val content: Option[String] = Option(e.getFieldAs("content")) + val revId = e.getField("rev_id") + val changeType = e.getField("change_type") - assert (revId != revisionCreateMissingRevId) - if (revId === pageCreateRevId) { - assert (changeType === ChangeType.PageCreate) - assert (content.isDefined) - } else if (revId === revisionCreateRevId) { - assert (changeType === ChangeType.RevisionCreate) - assert (content.isDefined) - } else if (revId === pageDeleteRevId) { - assert (changeType === ChangeType.PageDelete) - } + assert(revId != revisionCreateMissingRevId) + if (revId === pageCreateRevId) { + assert(changeType === ChangeType.PageCreate) + assert(content.isDefined) + } else if (revId === revisionCreateRevId) { + assert(changeType === ChangeType.RevisionCreate) + assert(content.isDefined) + } else if (revId === pageDeleteRevId) { + assert(changeType === ChangeType.PageDelete) + } }) } @@ -255,11 +257,12 @@ class EnrichmentIntegrationTest } /** - * Static object to collect errors. - * It must be static as those are collected from another thread and would be serialized to the task executor thread. - */ + * Static object to collect errors. + * It must be static as those are collected from another thread and would be serialized to the task executor thread. + */ object ErrorSink extends SinkFunction[String] { val errors: ListBuffer[String] = new ListBuffer - override def invoke(value: String, context: SinkFunction.Context): Unit = errors.append(value) + override def invoke(value: String, context: SinkFunction.Context): Unit = + errors.append(value) } -- GitLab From 2aa8d094a9b210ec165b4ba9d099ac031100df10 Mon Sep 17 00:00:00 2001 From: Gabriele Modena Date: Tue, 30 Aug 2022 21:42:40 +0200 Subject: [PATCH 4/4] Fix duplicate resource warning --- pom.xml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c3df78c..6d2d422 100644 --- a/pom.xml +++ b/pom.xml @@ -148,12 +148,17 @@ org.wikimedia eventutilities-flink - 1.2.0-SNAPSHOT + 1.2.0 javax.activation activation + + + com.github.java-json-tool + json-schema-core + @@ -266,6 +271,10 @@ org.apache.flink flink-connector-kafka + + com.github.java-json-tools + json-schema-core +