diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..bcb308c7dbc6f0aca57aec210dec5e4e74feeddc --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +/.bloop/ +/.idea/ +/.metals/ +# Ignore maven build output directory +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + +# Eclipse m2e generated files +# Eclipse Core +.project +# JDT-specific (Eclipse Java Development Tools) +.classpath diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..e7825a6b4ec514a39b0144c942c0189dd1ea7443 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,46 @@ +# To contribute improvements to CI/CD templates, please follow the Development guide at: +# https://docs.gitlab.com/ee/development/cicd/templates.html +# This specific template is located at: +# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Maven.gitlab-ci.yml + +# Build JAVA applications using Apache Maven (http://maven.apache.org) +# For docker image tags see https://hub.docker.com/_/maven/ +# +# For general lifecycle information see https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html + +# This template will build and test your projects +# * Caches downloaded dependencies and plugins between invocation. +# * Verify but don't deploy merge requests. +# * Deploy built artifacts from master branch only. + +variables: + # This will suppress any download for dependencies and plugins or upload messages which would clutter the console log. + # `showDateTime` will show the passed time in milliseconds. You need to specify `--batch-mode` to make this work. + MAVEN_OPTS: "-Dhttps.protocols=TLSv1.2 -Dmaven.repo.local=$CI_PROJECT_DIR/.m2/repository -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN -Dorg.slf4j.simpleLogger.showDateTime=true -Djava.awt.headless=true" + # As of Maven 3.3.0 instead of this you may define these options in `.mvn/maven.config` so the same config is used + # when running from the command line. + # `installAtEnd` and `deployAtEnd` are only effective with recent version of the corresponding plugins. + MAVEN_CLI_OPTS: "--batch-mode --errors --fail-at-end --show-version -DinstallAtEnd=true -DdeployAtEnd=true" + +# This template uses jdk8 for verifying and deploying images +image: maven:3.3.9-jdk-8 + +# Cache downloaded dependencies and plugins between builds. +# To keep cache across branches add 'key: "$CI_JOB_NAME"' +cache: + paths: + - .m2/repository + +# For merge requests do not `deploy` but only run `verify`. +# See https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html +.verify: &verify + stage: test + script: + - 'mvn $MAVEN_CLI_OPTS verify' + except: + variables: + - $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH + +# Verify merge requests using JDK8 +verify:jdk8: + <<: *verify diff --git a/README.md b/README.md index 6d275614ea3019b95df7ca480b8a09eb9e0e3915..df2c7d61bd976fbeb1e955a6258aa7e6102b1d5a 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,15 @@ Implementation of [T307959](https://phabricator.wikimedia.org/T307959). A POC Flink Service to Combine Existing Streams, Enrich and Output to New Topic. + +# Test and build + +Test with +```bash +mvn clean verify +``` + +Build a jar with dependencies with +```bash +mvn clean package +``` \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..136157aa6426740ff8e245ce8b176305544b05eb --- /dev/null +++ b/pom.xml @@ -0,0 +1,331 @@ + + + 4.0.0 + + + org.wikimedia.discovery + discovery-parent-pom + 1.54 + + org.wikimedia.dataplatform + enrichment + 1.0-SNAPSHOT + + mediawiki-stream-enrichment + + + 1.15.0 + 2.12 + 4.3.0 + 2.0.2 + 3.2.12 + 2.12.15 + + + + + com.google.guava + guava + 31.0.1-jre + + + io.circe + circe-generic-extras_${scala.compat.version} + 0.14.1 + + + io.circe + circe-generic_${scala.compat.version} + 0.14.1 + + + + io.circe + circe-parser_${scala.compat.version} + 0.14.1 + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + org.apache.flink + flink-sql-connector-kafka + ${flink.version} + + + org.scala-lang + scala-library + ${scala.version} + + + org.wikimedia + eventutilities + 1.0.9 + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${scala.compat.version} + ${flink.version} + provided + + + + org.apache.flink + flink-state-processor-api + ${flink.version} + provided + + + + + org.apache.flink + flink-statebackend-rocksdb + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${scala.compat.version} + ${flink.version} + provided + + + org.apache.flink + flink-table + ${flink.version} + pom + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.compat.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_${scala.compat.version} + ${flink.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + + org.scalatest + scalatest_${scala.compat.version} + ${scala.scalatest.version} + test + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + org.wikimedia.discovery + discovery-maven-tool-configs + ${discovery-maven-tool-configs.version} + + + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala.scala-maven-plugin.version} + + + + + -unchecked + -Ywarn-unused-import + -feature + -deprecation + -dependencyfile + ${project.build.directory}/.scala_dependencies + -target:jvm-${maven.compiler.target} + + ${project.build.directory}/sbt_1.0_zinc_org.scala-sbt + + + + + compile + testCompile + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala.scala-maven-plugin.version} + + + + compile + testCompile + + + + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + -target:jvm-${maven.compiler.target} + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + jar-with-dependencies + + single + + package + + + jar-with-dependencies-spi-compliant + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + + org.basepom.maven + duplicate-finder-maven-plugin + + + org.apache.spark.unused.UnusedStubClass + + + git.properties + + log4j2-test.properties + + + + + org.scala-lang + scala-library + + + org.apache.flink + flink-runtime + + + org.apache.flink + flink-table-runtime + + + org.apache.flink + flink-connector-kafka + + + org.apache.flink + flink-sql-connector-kafka + + + + + + org.scalastyle + scalastyle-maven-plugin + + + + org.scalatest + scalatest-maven-plugin + ${scala.scalatest-maven-plugin.version} + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + + + test + + test + + + + + + + diff --git a/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala new file mode 100644 index 0000000000000000000000000000000000000000..158090d8912e0b38e1a167b56da033ca5927f8e6 --- /dev/null +++ b/src/main/scala/org/wikimedia/dataplatform/Enrichment.scala @@ -0,0 +1,10 @@ +package org.wikimedia.dataplatform + +import org.apache.flink.streaming.api.scala._ + +object Enrichment { + def main(args: Array[String]): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.execute + } +} diff --git a/src/test/scala/EnrichmentSuite.scala b/src/test/scala/EnrichmentSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..bf20ac1f80974341d09e8feb33835541df1422d0 --- /dev/null +++ b/src/test/scala/EnrichmentSuite.scala @@ -0,0 +1,61 @@ +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} +import org.apache.flink.test.util.MiniClusterWithClientResource +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util +import java.util.Collections + +class EnrichmentIntegrationTest extends AnyFlatSpec with Matchers with BeforeAndAfter { + + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(2) + .setNumberTaskManagers(1) + .build) + + before { + flinkCluster.before() + } + + after { + flinkCluster.after() + } + + + "Identity pipeline" should "return its input" in { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + // configure your test environment + env.setParallelism(2) + + // values are collected in a static variable + CollectSink.values.clear() + + // create a stream of custom elements and apply transformations + env.fromElements(1L, 21L, 22L) + .addSink(new CollectSink()) + + // execute + env.execute() + + // verify your results + CollectSink.values should contain allOf (1L, 21L, 22L) + } +} + +// create a testing sink +class CollectSink extends SinkFunction[Long] { + + override def invoke(value: Long, context: SinkFunction.Context): Unit = { + CollectSink.values.add(value) + } +} + +object CollectSink { + // must be static + val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList()) +}