Sample Project using the Scala API

Build Tools

Flink projects can be built with different build tools. In order to get quickly started, Flink provides project templates for the following build tools:

These templates help you to set up the project structure and to create the initial build files.

SBT

Create Project

$ g8 tillrohrmann/flink-project
This will create a Flink project in the specified project directory from the flink-project template. If you haven't installed giter8, then please follow this installation guide.
$ git clone https://github.com/tillrohrmann/flink-project.git
This will create the Flink project in the directory flink-project.
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
This will create a Flink project in the specified project directory.

Build Project

In order to build your project you simply have to issue the sbt clean assembly command. This will create the fat-jar your-project-name-assembly-0.1-SNAPSHOT.jar in the directory target/scala_your-major-scala-version/.

Run Project

In order to run your project you have to issue the sbt run command.

Per default, this will run your job in the same JVM as sbt is running. In order to run your job in a distinct JVM, add the following line to build.sbt

fork in run := true

IntelliJ

We recommend using IntelliJ for your Flink job development. In order to get started, you have to import your newly created project into IntelliJ. You can do this via File -> New -> Project from Existing Sources... and then choosing your project’s directory. IntelliJ will then automatically detect the build.sbt file and set everything up.

In order to run your Flink job, it is recommended to choose the mainRunner module as the classpath of your Run/Debug Configuration. This will ensure, that all dependencies which are set to provided will be available upon execution. You can configure the Run/Debug Configurations via Run -> Edit Configurations... and then choose mainRunner from the Use classpath of module dropbox.

Eclipse

In order to import the newly created project into Eclipse, you first have to create Eclipse project files for it. These project files can be created via the sbteclipse plugin. Add the following line to your PROJECT_DIR/project/plugins.sbt file:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

In sbt use the following command to create the Eclipse project files

> eclipse

Now you can import the project into Eclipse via File -> Import... -> Existing Projects into Workspace and then select the project directory.

Maven

Requirements

The only requirements are working Maven 3.0.4 (or higher) and Java 7.x (or higher) installations.

Create Project

Use one of the following commands to create a project:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.3-SNAPSHOT
This allows you to name your newly created project. It will interactively ask you for the groupId, artifactId, and package name.
$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash

Inspect Project

There will be a new directory in your working directory. If you’ve used the curl approach, the directory is called quickstart. Otherwise, it has the name of your artifactId:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── org
                └── myorg
                    └── quickstart
                        ├── BatchJob.scala
                        ├── SocketTextStreamWordCount.scala
                        ├── StreamingJob.scala
                        └── WordCount.scala

The sample project is a Maven project, which contains four classes. StreamingJob and BatchJob are basic skeleton programs, SocketTextStreamWordCount is a working streaming example and WordCountJob is a working batch example. Please note that the main method of all classes allow you to start Flink in a development/testing mode.

We recommend you import this project into your IDE. For Eclipse, you need the following plugins, which you can install from the provided Eclipse Update Sites:

The IntelliJ IDE supports Maven out of the box and offers a plugin for Scala development.

Build Project

If you want to build your project, go to your project directory and issue the mvn clean package -Pbuild-jar command. You will find a jar that runs on every Flink cluster with a compatible version, target/original-your-artifact-id-your-version.jar. There is also a fat-jar in target/your-artifact-id-your-version.jar which, additionally, contains all dependencies that were added to the Maven project.

Next Steps

Write your application!

The quickstart project contains a WordCount implementation, the “Hello World” of Big Data processing systems. The goal of WordCount is to determine the frequencies of words in a text, e.g., how often do the terms “the” or “house” occur in all Wikipedia texts.

Sample Input:

big data is big

Sample Output:

big 2
data 1
is 1

The following code shows the WordCount implementation from the Quickstart which processes some text lines with two operators (a FlatMap and a Reduce operation via aggregating a sum), and prints the resulting words and counts to std-out.

object WordCountJob {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // emit result and print result
    counts.print()
  }
}

Check GitHub for the full example code.

For a complete overview over our API, have a look at the DataStream API, DataSet API, and Scala API Extensions sections. If you have any trouble, ask on our Mailing List. We are happy to provide help.