Flink projects can be built with different build tools. In order to get quickly started, Flink provides project templates for the following build tools:
These templates help you to set up the project structure and to create the initial build files.
$ g8 tillrohrmann/flink-project
$ git clone https://github.com/tillrohrmann/flink-project.git
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
In order to build your project you simply have to issue the sbt clean assembly
command.
This will create the fat-jar your-project-name-assembly-0.1-SNAPSHOT.jar in the directory target/scala_your-major-scala-version/.
In order to run your project you have to issue the sbt run
command.
Per default, this will run your job in the same JVM as sbt
is running.
In order to run your job in a distinct JVM, add the following line to build.sbt
fork in run := true
We recommend using IntelliJ for your Flink job development.
In order to get started, you have to import your newly created project into IntelliJ.
You can do this via File -> New -> Project from Existing Sources...
and then choosing your project’s directory.
IntelliJ will then automatically detect the build.sbt
file and set everything up.
In order to run your Flink job, it is recommended to choose the mainRunner
module as the classpath of your Run/Debug Configuration.
This will ensure, that all dependencies which are set to provided will be available upon execution.
You can configure the Run/Debug Configurations via Run -> Edit Configurations...
and then choose mainRunner
from the Use classpath of module dropbox.
In order to import the newly created project into Eclipse, you first have to create Eclipse project files for it.
These project files can be created via the sbteclipse plugin.
Add the following line to your PROJECT_DIR/project/plugins.sbt
file:
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
In sbt
use the following command to create the Eclipse project files
> eclipse
Now you can import the project into Eclipse via File -> Import... -> Existing Projects into Workspace
and then select the project directory.
The only requirements are working Maven 3.0.4 (or higher) and Java 7.x (or higher) installations.
Use one of the following commands to create a project:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.3-SNAPSHOT
$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash
There will be a new directory in your working directory. If you’ve used
the curl approach, the directory is called quickstart
. Otherwise,
it has the name of your artifactId
:
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── resources
│ └── log4j.properties
└── scala
└── org
└── myorg
└── quickstart
├── BatchJob.scala
├── SocketTextStreamWordCount.scala
├── StreamingJob.scala
└── WordCount.scala
The sample project is a Maven project, which contains four classes. StreamingJob and BatchJob are basic skeleton programs, SocketTextStreamWordCount is a working streaming example and WordCountJob is a working batch example. Please note that the main method of all classes allow you to start Flink in a development/testing mode.
We recommend you import this project into your IDE. For Eclipse, you need the following plugins, which you can install from the provided Eclipse Update Sites:
The IntelliJ IDE supports Maven out of the box and offers a plugin for Scala development.
If you want to build your project, go to your project directory and
issue the mvn clean package -Pbuild-jar
command. You will
find a jar that runs on every Flink cluster with a compatible
version, target/original-your-artifact-id-your-version.jar. There
is also a fat-jar in target/your-artifact-id-your-version.jar which,
additionally, contains all dependencies that were added to the Maven
project.
Write your application!
The quickstart project contains a WordCount
implementation, the
“Hello World” of Big Data processing systems. The goal of WordCount
is to determine the frequencies of words in a text, e.g., how often do
the terms “the” or “house” occur in all Wikipedia texts.
Sample Input:
big data is big
Sample Output:
big 2
data 1
is 1
The following code shows the WordCount
implementation from the
Quickstart which processes some text lines with two operators (a FlatMap
and a Reduce operation via aggregating a sum), and prints the resulting
words and counts to std-out.
object WordCountJob {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// emit result and print result
counts.print()
}
}
Check GitHub for the full example code.
For a complete overview over our API, have a look at the DataStream API, DataSet API, and Scala API Extensions sections. If you have any trouble, ask on our Mailing List. We are happy to provide help.